(#32) InfluxDB - Scratching the Surface
The last post outlined my first steps into using InfluxDB, a Time Series Database to store my IoT data. Persistence may not be futile after all.
Here’s what the vendor says:
Here’s what the vendor says:
InfluxData’s […] InfluxDB [will] handle massive amounts of time-stamped information. This time series database provides support for your metrics analysis needs, from DevOps Monitoring, IoT Sensor data, and Real-Time Analytics.
- Schema-less
- Self Cleaning
- Simple
Schemaless
By schemaless I mean that InfluxDB is a schemaless database which means it’s easy to add new measurements, tags, and fields at any time. There’s no CREATE TABLE prerequisite and no ALTER TABLE step if things need to change.
Just start writing your data.
Now, there are some key concepts to learn when using InfluxDB: measurements, tags and fields. But the value in being schemaless is that you can just get started.
Just start ingesting and storing data.
Then refine and refactor as you learn.
Just do it!
Self-Cleaning
How often have your relational databases accumulated crud, unknowingly? How often have your DBA or Sys Admin let you know that a table is getting big, a partition is full? How many times have you spun up a project to Archive Data? Any answer larger than “zero” is too many.
InfluxDB comes with the concept of a Retention Policy – it’ll expire (delete) old data without intervention.
That’s another reason why stashing IoT data in a conventional database is dumb. IoT data is inherently ephemeral. No one cares what the outside temperature was at 9:43a, three Thursdays ago! You did care! But now you don’t. Sensor data is evanescent!
A Retention Policy lets you tell the database how long to hang onto data. Here’s my Retention Policy for my IoT data:
Query query = new Query( "CREATE DATABASE SolarData WITH DURATION 30d” );
That’s it; my low-level, inherently transient data evaporates as it ages over 30 days.
See here for everything that InfliuxDB can do with retention policies.
Now, you will care about trends in the data, minimums, maximums. And other statistics. To that end, InfluxDB also gives you Continuous Queries. We’ll cover Continuous Queries later.
Simple
Let’s cover off on Simple by reviewing the code I created to pull machine data from the
MQTT broker and put it into InfluxDB using the Java client.
MQTT broker and put it into InfluxDB using the Java client.
Connect to the InfluxDB Server
One call:
InfluxDB influxDB = InfluxDBFactory.connect( databaseURL, "notused", "notused" );
Pong response = influxDB.ping();
if (response.getVersion().equalsIgnoreCase( "unknown" )) {
log.error( "Error pinging the InfluxDB server. Make sure it’s up and running!" );
return;
} else {
log.info( "Connected to InfluxDB Server. Version: " + response.getVersion() );
}
Create the database and retention policy.
Two calls. The great news is that if the database already exists, this step is harmless!
//
// Create the database, if it already exists, no harm is done
// Syntax: CREATE DATABASE <database_name> [WITH [DURATION <duration>]
// [REPLICATION <n>] [SHARD DURATION <duration>] [NAME <retention-policy-name>]]
Query q = new Query( "CREATE DATABASE " + databaseName + " WITH DURATION 30d", databaseName );
influxDB.query( query );
Now we get into the specifics of my application.
Everything is an Event
In my application, events will come in as JSON formatted messages from the MQTT broker.
I have a Java POJO for each Event type. As a JSON message arrives, it’s inflated into the right Java POJO using Google’s Gson framework.
I have a Java POJO for each Event type. As a JSON message arrives, it’s inflated into the right Java POJO using Google’s Gson framework.
Here’s a snippet for the Alarm Message POJO that backs up the Alarm Event
public class AlarmMessage_POJO extends EventMessage_POJO
{
private static final org.slf4j.Logger log =
LoggerFactory.getLogger( AlarmMessage_POJO.class );
/*
* Here’s what the JSON payload will look like:
* {"alarmCode":16,"alarmMessage":"PC LOAD LETTER","topic":"CSEA/MID-02/ALARM/","machineID":"MID-02","dateTime":"2018-05-31T14:38:11.487","inBatchMode":"true"}
*
*/
int alarmCode;
String alarmMessage;
public static AlarmMessage_POJO fromJSON (String jsonString)
{
try {
Gson gson = new Gson();
return gson.fromJson( jsonString, AlarmMessage_POJO.class );
} catch (Exception ex) {
log.error( "Error parsing message into JSON.", ex );
log.error( "Original JSON [" + jsonString + "]" );
return null;
}
}
< getters and setters follow … >
One more code snippet -– the Java POJO that represents the Machine’s Throughput in WidgetsPerMinute:
public class WPM_POJO extends EventMessage_POJO
{
private static final org.slf4j.Logger log =
LoggerFactory.getLogger(WPM_POJO.class);
/*
* Here’s what the JSON payload will look like:
*/
int numEvents;
int totalEvents;
int intervalSeconds;
float eventsPerMinute;
public static WPM_POJO fromJSON (String jsonString)
{
try {
Gson gson = new Gson();
return gson.fromJson( jsonString, WPM_POJO.class );
} catch (Exception ex) {
log.error("Error parsing message into JSON.", ex);
log.error("Original JSON [" + jsonString + "]");
return null;
}
}
< getters and setters follow … >
Pretty simple.
I'm doing one POJO per event type.
I'm doing one POJO per event type.
As each event is received, the JSON payload inflates the right object and it’s added to a BlockingQueue():
@Override
public void messageArrived (String topic, MqttMessage message) throws Exception
{
byte[] payload = message.getPayload();
String jsonMessage = new String( payload );
EventMessage_POJO anEvent = EventMessage_POJO.fromJSON( jsonMessage );
if (anEvent.isAlarmMessage())
anEvent = (AlarmMessage_POJO) AlarmMessage_POJO.fromJSON( jsonMessage );
else if (anEvent.isWPMMessage())
anEvent = (WPM_POJO) WPM_POJO.fromJSON( jsonMessage );
if (this.eventQueue != null) {
log.info( "Event received - putting (blocking call) on the queue" );
this.eventQueue.put( anEvent );
}
}
As this point, we have Machine Events represented as Java POJO objects sitting in a BlockingQueue().
Popping out to the other Java thread:
Popping out to the other Java thread:
while (true) {
//
// Suck an event off the queue
EventMessage_POJO anEvent = (EventMessage_POJO) eventQueue.take();
log.info( "event message arrived. Machine: {} Topic: {}",
anEvent.getMachineID(), anEvent.getTopic() );
//
// Let’s try the Synchronous Write approach outlined in the README at
// https://github.com/influxdata/influxdb-java
Point dataPoint = null;
BatchPoints batchPoints = BatchPoints
.database( databaseName )
.tag( "async", "true" )
.consistency( ConsistencyLevel.ALL )
.build();
if (anEvent.isAlarmMessage()) {
log.info( "Alarm event arrived - building InfluxDB Point object" );
dataPoint = Point.measurement( "alarms" )
.time( System.currentTimeMillis(), TimeUnit.MILLISECONDS )
.tag( "machineID", anEvent.machineID )
.addField( "CODE",
((AlarmMessage_POJO) anEvent).getAlarmCode() )
.build();
} else if (anEvent.isWPMMessage()) {
log.info( "WPM event arrived - building InfluxDB Point object" );
dataPoint = Point.measurement( "throughput" )
.time( System.currentTimeMillis(), TimeUnit.MILLISECONDS )
.tag( "machineID", anEvent.getMachineID() )
.addField( "WPM",
((WPM_POJO) anEvent).getEventsPerMinute() )
.build();
}
//
// Quick error check and if good, write data point to InfluxDB
if (dataPoint != null) {
batchPoints.point( dataPoint );
influxDB.write( batchPoints );
log.info( "Point written to InfluxDB successfully" );
} else {
log.error( "Event received but builder failed to build! dataPoint null!" );
}
}
That’s pretty simple and will get you up and running quickly.
We do need to cover off on measurements, fields and tags. Three essential concepts in InfluxDB.
That's the next post!