Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
The SAP Data Hub pipeline engine offers the possibility to develop custom operators in Python, JavaScript, Go, R and more languages. But, developing in a local environment offers a lot of great features, you don´t want to miss in your daily work:

  • Code checks and completion: SAP Data Hub provides a basic text editor – of course it is not a full IDE.

  • Immediate startup: A pipeline runs as a dynamic docker container, depending on your configuration also individual operators. To get your code pipeline-ready, start with a local development and reduce the “trial and error” due to coding flaws.

  • Error messages: Easily access full error messages including stack.

  • Debugging: Debugging the code of your operator with breakpoints.

  • Unit tests: Automate the testing of your operator (development operations).

  • ..


Keeping this in mind, we started to develop a mock of the pipeline engine. This offers the possibility to develop and test your code locally in the development environment of your choice with all those features making a great developer experience. When you´re done, you just copy and paste the code into the SAP Data Hub pipeline and it works. With this we were able to fasten our development speed in recent projects. But how is this done?

As a starting point, we based our extensions on the code from Jens Rannacher's blog series. We generalized the mock engine and added some more interfaces also provided by the pipeline engine api in SAP Data Hub. Below you can find the code of the "mock pipeline engine" implemented in Python with a small usage example. Feel free to develop a similar local simulation in other languages and share them with the community.

 
import json

try:
api
except NameError:
class api:
@staticmethod
def send(port, data):
print("Send data '%s' to '%s'" % (str(data), port))

@staticmethod
def set_port_callback(port, callback):
print("Call '%s' to simulate behavior when messages arrive at port '%s'" % (callback.__name__, port))
callback()

@staticmethod
def add_timer(interval, callback):
print("Call '%s' to simulate behavior when timer calls the callback." % (callback.__name__))
callback()

@staticmethod
def Message(body, attributes):
return Message(body, attributes)

class config:
env = 'local' # used to detect the local env
your_config_var = 'value'

class logger:
@staticmethod
def info(s):
print("info: %s" % (s))

@staticmethod
def debug(s):
print("debug: %s" % (s))

@staticmethod
def warn(s):
print("warn: %s" % (s))

@staticmethod
def error(s):
print("error: %s" % (s))

class Message:
def __init__(self, body, attributes):
self.body = body
self.attributes = attributes

def __str__(self):
return json.dumps({'attributes': self.attributes, 'body': str(self.body)})

'''
your operator coding
'''
counter = 0

def interface(msg=None):
api.logger.info("received a message")

api.logger.info("incrementing the counter")
global counter
counter += 1

api.logger.info("sending the current message count to the output")
api.send("result", counter)

'''
Starting the operator
'''
api.set_port_callback("input", interface)

# detecting environment (local vs. vflow)
try:
api.config.env
print("Operator started in local environment")

# mock an incoming message from a previous operator
msg = api.Message("message", {})
interface(msg)
except AttributeError:
print("Operator started in productive mode")