Skip to Content
Technical Articles
Author's profile photo Jacky Liu

Develope kafka rest api with nodejs to make it callable from CPI through sap cloud connector Part II

In previous blog Part I, we have prepared the on premise  kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .

We can follow the following steps :

Step 1 :  deploy  CPI iflow to consume  kafka topic’s msessage

Step 2 : deploy  nodejs rest api  to produce message to Kafka  and  consume kafka message and forward the message to cpi  .

code snippy :

Server.js

const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender';

const getClientCredentials = oauth.client(axios.create(), {
    url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token',
    grant_type: 'client_credentials',
    client_id: client_id_from_cpi_runtime_service_key,
    client_secret: client_secret_from_cpi_runtime_service_key,
    scope: ''
  });

const app = express();

const kafka = new Kafka({
    clientId: 'my-app',
    requestTimeout: 25000,
    connectionTimeout: 30000,
    authenticationTimeout:30000,
    retry: {
      initialRetryTime: 3000,
      retries: 0
    },
    logLevel: logLevel.INFO,
    brokers: ['localhost:9092']
  });

app.use(express.text())
app.use(bodyParser.json());

app.listen(4004,  () => { console.log('===> Server started') })


// depoly api to  produce  message to kafka
app.post('/kafka/:topic', (req, res) => {
    const topicv = req.params.topic;
    console.log(topicv);
const kafka = new Kafka({
    clientId: 'my-app',
    requestTimeout: 25000,
    connectionTimeout: 30000,
    authenticationTimeout:30000,
    retry: {
      initialRetryTime: 3000,
      retries: 0
    },
    brokers: ['localhost:9092']
  })
  const producer = kafka.producer();
  const run = async () => {
  await producer.connect();
  await   producer.send({
    topic: topicv,
    messages: [
      { value: req.body.msg.toString() },
    ],
  });
  await producer.disconnect();

  res.status(200).send('message send');
}
run().catch((e)=>{
    console.log(e);
    res.status(500).send('error');
})   
})  ;


// consume kafka message and forward the message to CPI 
const consumer = kafka.consumer({ groupId: 'test-group' });
 
const run = async () => {
  // Consuming

  await consumer.connect()
  await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
 
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {

      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
      getClientCredentials().then((token1)=>{  
          // console.log(token1.access_token);    
          const config = {
            headers:{
                "Authorization": `Bearer ${token1.access_token}`,
              "Content-Type": "application/json"
            }
          };  
          const data ={
            msg: message.value.toString()
          };
        axios.post(cpiurl,data,config).then(res=>{
            console.log(res);
        }).catch(e=>{
            console.log(e);
        });
    
    }).catch(e=>{
            console.log(e);
        })

    },
  })
}
 
run().catch((e)=>{
    console.log(e);
})





package.json

{
  "name": "capkafka",
  "version": "1.0.0",
  "description": "capkafka",
  "main": "server.js",
  "dependencies": {
    "axios": "^0.27.2",
    "axios-oauth-client": "^1.4.4",
    "body-parser": "latest",
    "express": "^4.17.1",
    "kafkajs": "latest",
    "passport": "^0.4.0"
  },
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "jacky liu",
  "license": "ISC"
}

Start the nodejs application :

 

 

Step 3 : test the deployed rest api with postman locally .

 

Step 4 : check the message in CPI  .

 

 

 

Step 4 :  Deploy  a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi  http or https adapter support  calling op rest api with the help of  sap cloud connector .

The end

Best regards!

Jacky Liu

 

 

Assigned Tags

      2 Comments
      You must be Logged on to comment or reply to a post.
      Author's profile photo Jacky Liu
      Jacky Liu
      Blog Post Author

      Hi, Dear readers:

      This blog is for investgation purpose . It is not for production . But you can enhance it for further use .

      Best regards!

      Jacky Liu

       

      Author's profile photo Muni M
      Muni M

      Thank you for the comment. this clarifies better.