import uuid
from datetime import datetime
import random
import json
#list of possible message types
msg_types = ['category', 'location', 'risk']
#define some values for each attribute
cat = ['travel', 'entertainment', 'groceries', 'health', 'government/taxes']
loc = ['beach', 'airport', 'mall', 'rural', 'home']
country = ['France', 'Italy', 'Spain', 'Greece', 'N/A']
#return attributes
def gen_msg(attr: str):
if (attr=='category'):
j={'category':random.choice(cat), 'category_confidence': round(random.random(),1)}
elif (attr=='location'):
j={'location':random.choice(loc), 'country':random.choice(country)}
else:
j={'risk':round(random.random(), 2), 'risk_confidence':round(random.random(),1)}
return j
events={}
def gen_event_id():
return str(uuid.uuid4())[:8]
#add a message for a given transaction id
def add_msg(transaction_id):
#get the list of possible message types
missing_attr=msg_types.copy()
if transaction_id in events:
#remove mesage types already sent
for a in events[transaction_id]:
missing_attr.remove(a)
else:
events[transaction_id]={}
#pick a message type not sent yet
new_attr=random.choice(missing_attr)
#create the corresponding message
events[transaction_id][new_attr] = gen_msg(new_attr)
return new_attr
def gen():
#reuse or create an event id
if(len(events)<= 15):
transaction_id = gen_event_id()
else:
transaction_id = random.choice(list(events.keys()))
#add an attribute to it
new_attr = add_msg(transaction_id)
#prepare a message by adding the timetamp and the transaction_id
mymsg={'transaction_id':transaction_id, 'ts':datetime.now().isoformat()}
mymsg.update(events[transaction_id][new_attr])
#send a json object inside an array because streaming operator only accepts json arrays
api.send(new_attr,json.dumps([mymsg]))
#send mymsg to the debugging port
api.send("alloutput", json.dumps(mymsg))
#if all the attributes are set for an evet, delete it from the queue
if (len(events[transaction_id])==len(msg_types)):
del events[transaction_id]
#2 messages per second
api.add_timer("0.5s", gen)
The operator is configured with json as input/output with no opcodes
//Use the first letter of location, category and risk
// to display the sequence in which the messages have arrived
DECLARE string displaySequence (bigdatetime loc_ts, bigdatetime cat_ts, bigdatetime risk_ts) {
bigdatetime min_ts := minof(loc_ts, cat_ts, risk_ts);
IF (min_ts=loc_ts)
IF (cat_ts<risk_ts)
return 'L,C,R';
else
return 'L,R,C';
ELSE IF (min_ts=cat_ts)
IF (loc_ts<risk_ts)
return 'C,L,R';
ELSE
return 'C,R,L';
ELSE
if (loc_ts<cat_ts)
return 'R,L,C';
else
return 'R,C,L';
}
END;
CREATE INPUT STREAM LOC SCHEMA (
transaction_id string,
location string,
country string,
ts bigdatetime
);
CREATE INPUT STREAM CAT SCHEMA (
transaction_id string,
category string,
category_confidence float,
ts bigdatetime
);
CREATE INPUT STREAM RISK SCHEMA (
transaction_id string,
risk float,
risk_confidence float,
ts bigdatetime
);
create output stream COMPLETE as
select LOC.transaction_id,
LOC.location,
CAT.category,
RISK.risk,
//compute time delta between first and last event
//convert result in from microseconds into seconds
//round at 2 digits
round((maxof(LOC.ts, RISK.ts, CAT.ts) -
minof(LOC.ts, RISK.ts, CAT.ts))/(1000.*1000), 2) as time_diff_s,
//use a custom function to display the sequence order
displaySequence( LOC.ts, CAT.ts, RISK.ts) as sequence_order
FROM LOC, CAT, RISK
MATCHING [1 minute : LOC && CAT && RISK]
on LOC.transaction_id = CAT.transaction_id = RISK.transaction_id ;
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
Subject | Kudos |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
User | Count |
---|---|
38 | |
19 | |
13 | |
13 | |
11 | |
10 | |
10 | |
10 | |
8 | |
8 |