Skip to Content
Technical Articles
Author's profile photo Leo Richard Irudayam

How to connect Apache Kafka via SAP Cloud Connector with SAP Cloud Platform and consume via Node.js app on Cloud Foundry

This blog post is all about how to connect Apache Kafka to SAP Cloud Platform via SAP Cloud Connector and use it either for subscription and/or publication purposes. Originally this was part of a research project, so this guide does not guarantee enterprise readiness out-of-the-box.

The biggest difficulty of getting Apache Kafka connected to your NodeJS Application on Cloud Foundry is the fact that Kafka uses a binary protocol straight over TCP (https://kafka.apache.org/protocol.html). This means that your connection has to be on TCP-level rather HTTP-level. Thus we cannot use SAP Cloud Platform Destination Service on CF and the TCP connection to Apache Kafka requires an advanced implementation of SOCKS5 for accessing the Connectivity Proxy.

What you need before you start:

  • Apache Kafka installed on your on-premise landscape (Quickstart)
  • An SAP Cloud Platform Cloud Foundry account including an assigned space. You must be an administrator to get a connection to your Cloud Connector later. Otherwise you will get an HTTP 417 while trying to connect

Which services of Cloud Foundry you’ll need:

  • Connectivity Service
  • Authorization & Trust Management Service (xsuaa)

Before you start installing the SAP Cloud Connector, make sure that you know that this is a security-critical installation. It might be you’ll need to open ports to the outside from your server or VM.

Preparation

The guide, how you install an SAP Cloud Connector can be found here. The guide on how to configure the SAP Cloud Connector to work with your SAP Cloud Platform Account can be found here.

Notice: In my setup, I’ve not used any Location ID, since this was the only Cloud Connector connected to the SCP Account. If you need a location ID however, please adapt later in the code sample the authentication request of SOCKS5.

Once there is a connection established between your server and your SCP account, you can setup a Cloud-to-On-Premise Mapping as described here. I used as internal host localhost:9092 (9092 is the standard port Kafka is running on – don’t mix it up with the port used by Apache Zookeeper) and as virtual host “kafka.cloud:9092”. When you check the availability, the SAP Cloud Connector shall return you “Reachable”. You’ll find this mapping also in your SAP Cloud Platform Cockpit under Cloud Connectors. Since this is a TCP connection, it is correct, that it shows you none resources and you can’t add any path mappings.

The next steps would require you to create a service instance of Connectivity and UAA – Documentation here. UAA is required to get a JWT Token for the Connectivity Proxy. Both these services need to be bound to your Node.js application.

There is a very good guide on how you can do this and what code you’ll need to write within your Node.js app: https://blogs.sap.com/2019/04/02/a-do-it-yourself-at-home-guide-how-to-connect-a-node.js-app-on-sap-cloud-platform-for-the-cloud-foundry-to-an-s4hana-on-premise-system-securely-via-cloud-connector/

Using Connectivity Proxy

Now the most crucial part needs to be tackled. The connection to Kafka via the Connectivity Proxy. There is an SAP Help Document showing how TCP connections are handled (Code samples in Java), which you can find here.

The key is to understand, that you need a special advanced SOCKS5 implementation to reach to Kafka. At the time of this blog post, there was no node package able to cope with SOCKS5 and the authentication via JWT Token. However, the node package socksv5 has a sort of extensibility concept, which I used (I haven’t used the original package since it cause some errors).

The following code sample, has for purposes of simplification hard-coded URLs.

var socks = require("@luminati-io/socksv5");
var STATE_AUTHTYPE = 0,
    STATE_AUTHSTATUS = 1;

var client = socks.connect({
    host: "kafka.cloud",
    port: 9092,
    proxyHost: "connectivityproxy.[...].hana.ondemand.com",
    proxyPort: 20004,
    localDNS: false // important
    auths: [{
        METHOD: 0x80,
        client: function clientHandler(stream, cb) {
            var state = STATE_AUTHTYPE;

            function onData(chunk) {
                var i = 0,
                    len = chunk.length;
                while (i < len) {
                    switch (state) {

                        case STATE_AUTHTYPE:
                            if (chunk[i] !== 0x01) {
                                stream.removeListener("data", onData);
                                cb(new Error("Unsupported auth request version: " + chunk[i]));
                                return;
                            }
                            ++i;
                            state = STATE_AUTHSTATUS;
                            break;
                        case STATE_AUTHSTATUS:
                            var status = chunk[i];
                            ++i;
                            state = null;
                            stream.removeListener("data", onData);
                            cb(status === 0);
                            return;
                            break;
                    }
                }
            }

            stream.on("data", onData);

            // === Authenticate ==
            // Send the following bytes
            // 1 byte - authentication method version: 1
            // 4 bytes - length of JWT token acquired from XSUAA OAuth
            // X bytes - The content of JWT token acquired from XSUAA OAuth
            // 1 byte - Length of Cloud Connector location ID: Currently 0 because we don't CC locations
            // Y bytes - The content of location ID
            var len = Buffer.byteLength(jwtToken, "utf8");
            var buf = Buffer.alloc(5 + 1 + len); //10 +
            buf[0] = 0x01;
            var pos = 1;
            pos = buf.writeInt32BE(len, pos);
            pos = buf.write(jwtToken, pos);
            pos += 5;
            buf[pos] = 0x00;

            //console.log(buf.toString("hex").match(/../g).join(" "));
            stream.write(buf);
        },
    }],
},
function(socket) {
    workWithKafka(socket)
}
);
})
.catch((err) => console.log(err));

This snippet is mainly there to establish a connection to the Connectivity Proxy. SOCKS5 protocol is based on simple steps:

  • Greeting of client and server
  • Client offers authentication options
  • Server chooses authentication option
  • Client sends authentication request
  • Server verifies authentication request
  • Client requests a connection
  • Server establishes the connections

The non-italic marked steps are covered by the node package. However, the connectivity proxy requests an 0x80 (reserved by protocol definition for custom auth) authentication option, which requests a custom implementation of the italic marked steps. Be aware you work with integers using big endians (BE)!

 

Interact with Kafka

The following snippet uses kafkajs as Kafka NodeJS handler. It uses pretty much standard functionalities, except that you need to use the socket of your proxy request.

async function workWithKafka(socket) {
    var myCustomSocketFactory = ({
        host,
        port,
        ssl,
        onConnect
    }) => {
        socket.setKeepAlive(true, 60 * 60 * 1000)
        onConnect();
        return socket;
    };

    var broker = ["kafka.cloud:9092"];

    const kafka = new Kafka({
        clientId: "dw-client",
        brokers: broker,
        retry: {
            initialRetryTime: 5000,
            retries: 2,
        },
        requestTimeout: 30000,
        authenticationTimeout: 7000,
        socketFactory: myCustomSocketFactory,
        logLevel: logLevel.ERROR
    });

    const producer = kafka.producer();
    const consumer = kafka.consumer({
        groupId: "test-group"
    });

    const run = async () => {
        // Producing
        await producer.connect();
        await producer.send({
            topic: "nodejs-trial",
            messages: [{
                value: "Hello KafkaJS user!"
            }],
        });
        console.log("Message is sent");

        // Consuming
        await consumer.connect();
        await consumer.subscribe({
            topic: "nodejs-trial",
            fromBeginning: true,
        });

        await consumer.run({
            eachMessage: async ({
                topic,
                partition,
                message
            }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                });
            },
        });

        await producer.send({
            topic: "test-topic",
            messages: [{
                value: "I want more"
            }],
        });
    };

    run().catch(console.error);
}

 

Conclusion

This is all you need. I hope this tutorial and especially the custom implementation including code snippets of the authentication process for SOCKS5 were helpful for you. If you decide to use Kafka on your on-premise world, then there is at the moment no other option then implementing an adapted version of SOCKS5 to connect to Connectivity Proxy. Leave a like or comment when this was useful for you.

Assigned Tags

      4 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Diego García
      Diego García

      Hello,

      Thanks for the explanation!. Currently, I'm having problems with authentication at the moment of the connection with the connectivity proxy of SCP. Would you mind sharing all the implementation code to check where could I be wrong?

       

      Thanks in advance,

      Diego Garcia

      Author's profile photo Leo Richard Irudayam
      Leo Richard Irudayam
      Blog Post Author

      Hello Diego,

      sure, you find this code here: https://github.com/lirudayam/uni-li-wue-project-seminar/blob/master/dw-kafka-processor/server.js

      Please mind, it's not the cleanest code, since this was an MVP.

       

      Best,

      Leo

      Author's profile photo sara moreno da silva
      sara moreno da silva

      Hi Leo Richard Irudayam

       

      I'm trying to use your code with location ID and it seems it's not working, I don't know if I'm doing something wrong.

      It would be really appreciated if you can help here 🙂

      My code:

      var len = Buffer.byteLength(jwtTokenConnectivityService, "utf8");
                                  var len2 = Buffer.byteLength("SAPCC-DEV", "utf8");//location id length
                                  var buf = Buffer.alloc(5 + 1 + len + len2); //10 + location id
                                  buf[0] = 0x01;
                                  var pos = 1;
                                  pos = buf.writeInt32BE(len, pos);
                                  pos = buf.write(jwtTokenConnectivityService, pos);
                                  pos += 5;
                                  //buf[pos] = 0x00;
      
      
                                  
                                  buf[pos] = len2//0x00; location id length
                                  pos = buf.writeInt32BE(len2, pos); //position
                                  pos = buf.write("SAPCC-DEV", pos); // write location id on buff
      Thanks in advance
      Author's profile photo Rajesh PS
      Rajesh PS

      Hello Leo Richard Irudayam

       

      Nice blog indeed. Cheers to this!

      I would like to know if kafka adapter supports SAP CPI fully fledged & how flexible it is (in comparison with SAP PO Kafka adapter) ?

      Secondly does SAP CPI supports schema registry, serialization as of now, and also how about the avro & json conversions?

      Is it a tactical long term reliable solution to use via SAP CPI Cloud?

      Also not sure about the license/subscription  cost ? post using Kafka adapter which should not ideally not end up in capacity or feature constraints and each has it pros and cons?

      Looking forward for your valuable thoughts in elucidate. Thanks in advance!