Send your Raspberry Pi sensor data to SAP Vora via Apache Kafka managed by the SAP Data Hub
18 month ago, I had been sending my first sensor data via a Raspberry Pi into the SAP Cloud Platform to analyse it with the SAP Cloud Platform, predictive services or measure it with the SAP Smart Business Service.
Since then not only my device shrunk from a Raspberry Pi 3 Model B to a Raspberry Pi Zero W:
But also, Apache Kafka has become the de facto standard distributed streaming platform for IoT:
Therefore, in this blog I describe how I send my Raspberry Pi sensor data to SAP Vora via Apache Kafka managed by the SAP Data Hub.
To start with, I verify that my Data Pipelines can access my Apache Kafka installation with the pre-delivered Kafka Data Pipeline that comes with the SAP Data Hub:
I only need to adjust the brokers parameter according to my Kafka settings in Ambari:
That proven I adjust my Java code to send my Raspberry Pi sensor data to Kafka rather than to the SAP Cloud Platform and schedule it to run every minute again:
import com.pi4j.io.i2c.I2CBus;
import com.pi4j.io.i2c.I2CFactory;
import com.pi4j.io.i2c.I2CDevice;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.sql.Timestamp;
public class Kafka {
public static void main(String[] args) {
try {
I2CBus i2cBus = I2CFactory.getInstance(I2CBus.BUS_1);
I2CDevice mcp9801 = i2cBus.getDevice(0x4f);
byte[] buffer = new byte[2];
Properties props = new Properties();
props.put("bootstrap.servers", "linux-p2i7:6667");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int bytes = mcp9801.read(0, buffer, 0, 2);
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
producer.send(new ProducerRecord<String, String>("raspberrypi", Integer.toString(0), timestamp + "," + Double.toString((double) buffer[0] - (double) buffer[1] / 256)));
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Next, I build my Data Pipeline with a Kafka Consumer and a SAP Vora Inserter similar to Schedule complex Data Pipelines with the SAP Data Hub and Run it:
In this Data Pipeline I listen to Kafka topic raspberry and store the respective messages in SAP Vora table RASPI. You find the JSON file with the details at the end of this blog.
As a result, I inspect my Raspberry Pi sensor data from my SAP Vora Tools:
With my Raspberry Pi sensor data in SAP Vora the options to uncover insights from it are of course manifold.
By the way, this is my Data Pipeline in JSON format:
{
"properties": {},
"description": "",
"processes": {
"kafkaconsumer1": {
"component": "com.sap.kafka.consumer",
"metadata": {
"label": "Kafka Consumer",
"x": 16,
"y": 12,
"height": 80,
"width": 120,
"config": {
"zookeepers": "linux-p2i7:2181",
"offset": "newest",
"groupId": "test_group",
"topics": "raspberrypi"
}
}
},
"sapvorainserter1": {
"component": "com.sap.vora.inserter",
"metadata": {
"label": "SAP Vora Inserter",
"x": 199.99999904632568,
"y": 12,
"height": 80,
"width": 120,
"config": {
"dsn": "v2://linux-p76x:2202/?binary=true",
"initStatements": "",
"tableColumnTypes": "Timestamp TIMESTAMP, Temperature Double",
"tableName": "RASPI"
}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "140,52 167.99999952316284,52 167.99999952316284,61 195.99999904632568,61"
},
"src": {
"port": "message",
"process": "kafkaconsumer1"
},
"tgt": {
"port": "inmessage2",
"process": "sapvorainserter1"
}
}
],
"inports": {},
"outports": {}
}
Update for SAP Data Hub 2.3.3:
{
"properties": {},
"description": "",
"processes": {
"kafkaconsumer21": {
"component": "com.sap.kafka.consumer2",
"metadata": {
"label": "Kafka Consumer2",
"x": 17,
"y": 12,
"height": 80,
"width": 120,
"config": {
"brokers": "ambari.com:6667",
"topics": "raspi",
"groupId": "test_group"
}
}
},
"voraavroingestor1": {
"component": "com.sap.vora.avroingestor",
"metadata": {
"label": "Vora Avro Ingestor",
"x": 186,
"y": 12,
"height": 80,
"width": 120,
"config": {
"defaultAvroSchema": "{\"name\":\"raspi\",\"type\":\"record\",\"fields\":[{\"name\":\"millis\",\"type\":\"timestamp-millis\"},{\"name\":\"temperature\",\"type\":\"double\"}]}",
"connectionType": "connection",
"connection": {
"configurationType": "Configuration Manager",
"connectionID": "VORA",
"connectionProperties": {}
},
"format": "csv"
}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "141,52 181,52"
},
"src": {
"port": "message",
"process": "kafkaconsumer21"
},
"tgt": {
"port": "in",
"process": "voraavroingestor1"
}
}
],
"inports": {},
"outports": {}
}
Great blog Frank! The entire series about SAP Data Hub is really interesting!
Frank Great topic. I have few questions. I am not clear with the flow. How does your Raspberry pi connect to Ambari/Kafka? Where is KAfka installed? Is it on Raspberry Pi? If Can you explain a little bit more that would be helpful. I have Data hub installed and the Raspberry Pi. I am missing the Kafka and Ambari part, how to configure it. I would appreciate your help. Thanks.
do you get any information ?
Hi frank
great work, Working on a similar project to stream the data using KAFKA , the producer from raspberry pi to stream python temperature measurement and the consumer should be in the DI cloud.
Kafka installed successfully on raspberry pi, the pipeline in DI done
now I need to exposed to the internet (but I don’t know how ?)not sure from where you got the broker configuration?
1 Brocker, auto-commit is enabled, and the interval every 5 sec is fine.