import com.pi4j.io.i2c.I2CBus;
import com.pi4j.io.i2c.I2CFactory;
import com.pi4j.io.i2c.I2CDevice;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.sql.Timestamp;
public class Kafka {
public static void main(String[] args) {
try {
I2CBus i2cBus = I2CFactory.getInstance(I2CBus.BUS_1);
I2CDevice mcp9801 = i2cBus.getDevice(0x4f);
byte[] buffer = new byte[2];
Properties props = new Properties();
props.put("bootstrap.servers", "linux-p2i7:6667");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int bytes = mcp9801.read(0, buffer, 0, 2);
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
producer.send(new ProducerRecord<String, String>("raspberrypi", Integer.toString(0), timestamp + "," + Double.toString((double) buffer[0] - (double) buffer[1] / 256)));
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
{
"properties": {},
"description": "",
"processes": {
"kafkaconsumer1": {
"component": "com.sap.kafka.consumer",
"metadata": {
"label": "Kafka Consumer",
"x": 16,
"y": 12,
"height": 80,
"width": 120,
"config": {
"zookeepers": "linux-p2i7:2181",
"offset": "newest",
"groupId": "test_group",
"topics": "raspberrypi"
}
}
},
"sapvorainserter1": {
"component": "com.sap.vora.inserter",
"metadata": {
"label": "SAP Vora Inserter",
"x": 199.99999904632568,
"y": 12,
"height": 80,
"width": 120,
"config": {
"dsn": "v2://linux-p76x:2202/?binary=true",
"initStatements": "",
"tableColumnTypes": "Timestamp TIMESTAMP, Temperature Double",
"tableName": "RASPI"
}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "140,52 167.99999952316284,52 167.99999952316284,61 195.99999904632568,61"
},
"src": {
"port": "message",
"process": "kafkaconsumer1"
},
"tgt": {
"port": "inmessage2",
"process": "sapvorainserter1"
}
}
],
"inports": {},
"outports": {}
}
{
"properties": {},
"description": "",
"processes": {
"kafkaconsumer21": {
"component": "com.sap.kafka.consumer2",
"metadata": {
"label": "Kafka Consumer2",
"x": 17,
"y": 12,
"height": 80,
"width": 120,
"config": {
"brokers": "ambari.com:6667",
"topics": "raspi",
"groupId": "test_group"
}
}
},
"voraavroingestor1": {
"component": "com.sap.vora.avroingestor",
"metadata": {
"label": "Vora Avro Ingestor",
"x": 186,
"y": 12,
"height": 80,
"width": 120,
"config": {
"defaultAvroSchema": "{\"name\":\"raspi\",\"type\":\"record\",\"fields\":[{\"name\":\"millis\",\"type\":\"timestamp-millis\"},{\"name\":\"temperature\",\"type\":\"double\"}]}",
"connectionType": "connection",
"connection": {
"configurationType": "Configuration Manager",
"connectionID": "VORA",
"connectionProperties": {}
},
"format": "csv"
}
}
}
},
"groups": [],
"connections": [
{
"metadata": {
"points": "141,52 181,52"
},
"src": {
"port": "message",
"process": "kafkaconsumer21"
},
"tgt": {
"port": "in",
"process": "voraavroingestor1"
}
}
],
"inports": {},
"outports": {}
}
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.
User | Count |
---|---|
53 | |
5 | |
4 | |
4 | |
4 | |
4 | |
3 | |
3 | |
3 | |
3 |