import json
try:
api
except NameError:
class api:
@staticmethod
def send(port, data):
print("Send data '%s' to '%s'" % (str(data), port))
@staticmethod
def set_port_callback(port, callback):
print("Call '%s' to simulate behavior when messages arrive at port '%s'" % (callback.__name__, port))
callback()
@staticmethod
def add_timer(interval, callback):
print("Call '%s' to simulate behavior when timer calls the callback." % (callback.__name__))
callback()
@staticmethod
def Message(body, attributes):
return Message(body, attributes)
class config:
env = 'local' # used to detect the local env
your_config_var = 'value'
class logger:
@staticmethod
def info(s):
print("info: %s" % (s))
@staticmethod
def debug(s):
print("debug: %s" % (s))
@staticmethod
def warn(s):
print("warn: %s" % (s))
@staticmethod
def error(s):
print("error: %s" % (s))
class Message:
def __init__(self, body, attributes):
self.body = body
self.attributes = attributes
def __str__(self):
return json.dumps({'attributes': self.attributes, 'body': str(self.body)})
'''
your operator coding
'''
counter = 0
def interface(msg=None):
api.logger.info("received a message")
api.logger.info("incrementing the counter")
global counter
counter += 1
api.logger.info("sending the current message count to the output")
api.send("result", counter)
'''
Starting the operator
'''
api.set_port_callback("input", interface)
# detecting environment (local vs. vflow)
try:
api.config.env
print("Operator started in local environment")
# mock an incoming message from a previous operator
msg = api.Message("message", {})
interface(msg)
except AttributeError:
print("Operator started in productive mode")
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
40 | |
25 | |
17 | |
13 | |
8 | |
7 | |
7 | |
7 | |
6 | |
6 |