Technical Articles
Develope kafka rest api with nodejs to make it callable from CPI through sap cloud connector Part II
In previous blog Part I, we have prepared the on premise kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .
We can follow the following steps :
Step 1 : deploy CPI iflow to consume kafka topic’s msessage
Step 2 : deploy nodejs rest api to produce message to Kafka and consume kafka message and forward the message to cpi .
code snippy :
Server.js
const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender';
const getClientCredentials = oauth.client(axios.create(), {
url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token',
grant_type: 'client_credentials',
client_id: client_id_from_cpi_runtime_service_key,
client_secret: client_secret_from_cpi_runtime_service_key,
scope: ''
});
const app = express();
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
logLevel: logLevel.INFO,
brokers: ['localhost:9092']
});
app.use(express.text())
app.use(bodyParser.json());
app.listen(4004, () => { console.log('===> Server started') })
// depoly api to produce message to kafka
app.post('/kafka/:topic', (req, res) => {
const topicv = req.params.topic;
console.log(topicv);
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: ['localhost:9092']
})
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: topicv,
messages: [
{ value: req.body.msg.toString() },
],
});
await producer.disconnect();
res.status(200).send('message send');
}
run().catch((e)=>{
console.log(e);
res.status(500).send('error');
})
}) ;
// consume kafka message and forward the message to CPI
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
getClientCredentials().then((token1)=>{
// console.log(token1.access_token);
const config = {
headers:{
"Authorization": `Bearer ${token1.access_token}`,
"Content-Type": "application/json"
}
};
const data ={
msg: message.value.toString()
};
axios.post(cpiurl,data,config).then(res=>{
console.log(res);
}).catch(e=>{
console.log(e);
});
}).catch(e=>{
console.log(e);
})
},
})
}
run().catch((e)=>{
console.log(e);
})
package.json
{
"name": "capkafka",
"version": "1.0.0",
"description": "capkafka",
"main": "server.js",
"dependencies": {
"axios": "^0.27.2",
"axios-oauth-client": "^1.4.4",
"body-parser": "latest",
"express": "^4.17.1",
"kafkajs": "latest",
"passport": "^0.4.0"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "jacky liu",
"license": "ISC"
}
Start the nodejs application :
Step 3 : test the deployed rest api with postman locally .
Step 4 : check the message in CPI .
Step 4 : Deploy a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi http or https adapter support calling op rest api with the help of sap cloud connector .
The end
Best regards!
Jacky Liu
Hi, Dear readers:
This blog is for investgation purpose . It is not for production . But you can enhance it for further use .
Best regards!
Jacky Liu
Thank you for the comment. this clarifies better.