Skip to Content

Nearly a year ago, I had been sending my Raspberry Pi sensor data to SAP Vora via Apache Kafka managed by the SAP Data Hub.

While this has been working flawlessly since then, with MQTT (Message Queuing Telemetry Transport) an extremely lightweight machine-to-machine connectivity protocol that seems optimally suited for my Raspberry Pi sensor data has evolved.

Also, having just upgraded to the new SAP Data Hub 2.3.3, I wanted to see the new SAP Vora Avro Ingestor operator in action.

For this, I setup the lightweight Eclipse Mosquitto MQTT message broker on a plain vanilla Ubuntu server:

sudo apt-get update
sudo apt-get install mosquitto

And install the mosquitto-clients to test subscribe to the messages I will be generating:

sudo apt-get install mosquitto-clients
mosquitto_sub -t "raspi"

Then I adjust my sending Raspberry Pi Java application leveraging Eclipse Paho:

import com.pi4j.io.i2c.I2CBus;
import com.pi4j.io.i2c.I2CFactory;
import com.pi4j.io.i2c.I2CDevice;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.sql.Timestamp;
public class MQTT {
 public static void main(String[] args) {
  String topic = "raspi";
  int qos = 2;
  String broker = "tcp://192.168.2.80:1883";
  String clientId = "Raspberry PI";
  MemoryPersistence persistence = new MemoryPersistence();
  try {
   I2CBus i2cBus = I2CFactory.getInstance(I2CBus.BUS_1);
   I2CDevice mcp9801 = i2cBus.getDevice(0x4f);
   byte[] buffer = new byte[2];
   MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
   MqttConnectOptions connOpts = new MqttConnectOptions();
   connOpts.setCleanSession(true);
   sampleClient.connect(connOpts);
   int bytes = mcp9801.read(0, buffer, 0, 2);
   Timestamp timestamp = new Timestamp(System.currentTimeMillis());
   String content = timestamp + "," + Double.toString((double) buffer[0] - (double) buffer[1] / 256);
   MqttMessage message = new MqttMessage(content.getBytes());
   message.setQos(qos);
   sampleClient.publish(topic, message);
   sampleClient.disconnect();
   System.exit(0);
  } catch (Exception me) {
   System.out.println("msg " + me.getMessage());
   System.out.println("cause " + me.getCause());
   System.out.println("excep " + me);
   me.printStackTrace();
  }
 }
}

With this I model my Data Pipeline similar to before:

One important change though is the need for a defaultAvroSchema, which is easy enough in my case:

{
  "name": "mqtt",
  "type": "record",
  "fields": [
    {
      "name": "millis",
      "type": "timestamp-millis"
    },
    {
      "name": "temperature",
      "type": "double"
    }
  ]
}

This automatically creates table default\vora.MQTT and starts inserting my temperature information:

How cool is that.

 

To report this post you need to login first.

1 Comment

You must be Logged on to comment or reply to a post.

Leave a Reply