Skip to Content

The SAP Data Hub pipeline engine offers the possibility to develop custom operators in Python, JavaScript, Go, R and more languages. But, developing in a local environment offers a lot of great features, you don´t want to miss in your daily work:

  • Code checks and completion: SAP Data Hub provides a basic text editor – of course it is not a full IDE.
  • Immediate startup: A pipeline runs as a dynamic docker container, depending on your configuration also individual operators. To get your code pipeline-ready, start with a local development and reduce the “trial and error” due to coding flaws.
  • Error messages: Easily access full error messages including stack.
  • Debugging: Debugging the code of your operator with breakpoints.
  • Unit tests: Automate the testing of your operator (development operations).
  • ..

Keeping this in mind, we started to develop a mock of the pipeline engine. This offers the possibility to develop and test your code locally in the development environment of your choice with all those features making a great developer experience. When you´re done, you just copy and paste the code into the SAP Data Hub pipeline and it works. With this we were able to fasten our development speed in recent projects. But how is this done?

As a starting point, we based our extensions on the code from Jens Rannacher’s blog series. We generalized the mock engine and added some more interfaces also provided by the pipeline engine api in SAP Data Hub. Below you can find the code of the “mock pipeline engine” implemented in Python with a small usage example. Feel free to develop a similar local simulation in other languages and share them with the community.

 

import json

try:
    api
except NameError:
    class api:
        @staticmethod
        def send(port, data):
            print("Send data '%s' to '%s'" % (str(data), port))

        @staticmethod
        def set_port_callback(port, callback):
            print("Call '%s' to simulate behavior when messages arrive at port '%s'" % (callback.__name__, port))
            callback()

        @staticmethod
        def add_timer(interval, callback):
            print("Call '%s' to simulate behavior when timer calls the callback." % (callback.__name__))
            callback()

        @staticmethod
        def Message(body, attributes):
            return Message(body, attributes)

        class config:
            env = 'local'  # used to detect the local env
            your_config_var = 'value'

        class logger:
            @staticmethod 
            def info(s):
                print("info: %s" % (s))

            @staticmethod
            def debug(s):
                print("debug: %s" % (s))

            @staticmethod
            def warn(s):
                print("warn: %s" % (s))

            @staticmethod
            def error(s):
                print("error: %s" % (s))

    class Message:
        def __init__(self, body, attributes):
            self.body = body
            self.attributes = attributes

        def __str__(self):
            return json.dumps({'attributes': self.attributes, 'body': str(self.body)})

'''
your operator coding
'''
counter = 0

def interface(msg=None):
    api.logger.info("received a message")

    api.logger.info("incrementing the counter")
    global counter
    counter += 1

    api.logger.info("sending the current message count to the output")
    api.send("result", counter)

'''
Starting the operator
'''
api.set_port_callback("input", interface)

# detecting environment (local vs. vflow)
try:
    api.config.env
    print("Operator started in local environment")

    # mock an incoming message from a previous operator
    msg = api.Message("message", {})
    interface(msg)
except AttributeError:
    print("Operator started in productive mode")
To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply