Monday, March 4, 2019

(#32) InfluxDB - Scratching the Surface


The last post outlined my first steps into using InfluxDB, a Time Series Database to store my IoT data.  Persistence may not be futile after all. 

Here’s what the vendor says:

InfluxData’s […] InfluxDB [will] handle massive amounts of time-stamped information. This time series database provides support for your metrics analysis needs, from DevOps Monitoring, IoT Sensor data, and Real-Time Analytics.

Jumping right in, here’s the list of my Current Likes for InfluxDB
  •     Schema-less
  •     Self Cleaning
  •     Simple


Schemaless

By schemaless I mean that InfluxDB is a schemaless database which means it’s easy to add new measurements, tags, and fields at any time.   There’s no CREATE TABLE prerequisite and no ALTER TABLE step if things need to change.

Just start writing your data.

Now, there are some key concepts to learn when using InfluxDB: measurements, tags and fields. But the value in being schemaless is that you can just get started. 

Just start ingesting and storing data. 
Then refine and refactor as you learn.  
Just do it!


Self-Cleaning

How often have your relational databases accumulated crud, unknowingly? How often have your DBA or Sys Admin let you know that a table is getting big, a partition is full? How many times have you spun up a project to Archive Data?  Any answer larger than “zero” is too many.



InfluxDB comes with the concept of a Retention Policy – it’ll expire (delete) old data without intervention.


That’s another reason why stashing IoT data in a conventional database is dumb.  IoT data is inherently ephemeral.  No one cares what the outside temperature was at 9:43a, three Thursdays ago! You did care! But now you don’t.  Sensor data is evanescent!

A Retention Policy lets you tell the database how long to hang onto data.  Here’s my Retention Policy for my IoT data:

Query   query = new Query( "CREATE DATABASE SolarData WITH DURATION 30d” );

That’s it; my low-level, inherently transient data evaporates as it ages over 30 days.
See here for everything that InfliuxDB can do with retention policies.

Now, you will care about trends in the data, minimums, maximums. And other statistics.  To that end, InfluxDB also gives you Continuous Queries.  We’ll cover Continuous Queries later.


Simple

Let’s cover off on Simple by reviewing the code I created to pull machine data from the
MQTT broker and put it into InfluxDB using the Java client.

Connect to the InfluxDB Server

One call:

InfluxDB influxDB = InfluxDBFactory.connect( databaseURL, "notused", "notused" );
Pong response = influxDB.ping();
if (response.getVersion().equalsIgnoreCase( "unknown" )) {
   log.error( "Error pinging the InfluxDB server. Make sure it’s up and running!" );
   return;
} else {
   log.info( "Connected to InfluxDB Server. Version: " + response.getVersion() );
}   

Create the database and retention policy.

Two calls. The great news is that if the database already exists, this step is harmless!

//
//  Create the database, if it already exists, no harm is done
//  Syntax: CREATE DATABASE <database_name> [WITH [DURATION <duration>]
//   [REPLICATION <n>] [SHARD DURATION <duration>] [NAME <retention-policy-name>]]
Query   q = new Query( "CREATE DATABASE " + databaseName + " WITH DURATION 30d", databaseName );
influxDB.query( query );



Now we get into the specifics of my application.

Everything is an Event

In my application, events will come in as JSON formatted messages from the MQTT broker.

I have a Java POJO for each Event type. As a JSON message arrives, it’s inflated into the right Java POJO using Google’s Gson framework.


Here’s a snippet for the Alarm Message POJO that backs up the Alarm Event

public class AlarmMessage_POJO extends EventMessage_POJO
{

    private static final org.slf4j.Logger log =
                LoggerFactory.getLogger( AlarmMessage_POJO.class );

    /*
     * Here’s what the JSON payload will look like:
     *     {"alarmCode":16,"alarmMessage":"PC LOAD LETTER","topic":"CSEA/MID-02/ALARM/","machineID":"MID-02","dateTime":"2018-05-31T14:38:11.487","inBatchMode":"true"}
     *
     */

    int     alarmCode;
    String  alarmMessage;
   

    public static AlarmMessage_POJO fromJSON (String jsonString)
    {
        try {
            Gson gson = new Gson();
            return gson.fromJson( jsonString, AlarmMessage_POJO.class );

        } catch (Exception ex) {
            log.error( "Error parsing message into JSON.", ex );
            log.error( "Original JSON [" + jsonString + "]" );
            return null;
        }
    }

    < getters and setters follow … >



One more code snippet -– the Java POJO that represents the Machine’s Throughput in WidgetsPerMinute:

public class WPM_POJO extends EventMessage_POJO
{
    private static final org.slf4j.Logger log =
                 LoggerFactory.getLogger(WPM_POJO.class);

    /*
     * Here’s what the JSON payload will look like:
     * {"numEvents":9,"totalEvents":3894,"intervalSeconds":60,"eventsPerMinute":9.0,"topic":"CSEA/MID -01/WPM_EVENT/","machineID":"MID-01","dateTime":"2018-05-30T09:36:11.468"," inBatchMode ":"true"}
     */

    int    numEvents;
    int    totalEvents;
    int    intervalSeconds;
    float  eventsPerMinute;
   
    public static WPM_POJO fromJSON (String jsonString)
    {
        try {
            Gson gson = new Gson();
            return gson.fromJson( jsonString, WPM_POJO.class );
        } catch (Exception ex) {
            log.error("Error parsing message into JSON.", ex);
            log.error("Original JSON [" + jsonString + "]");
            return null;
        }
    }
     < getters and setters follow … >
            

Pretty simple. 
I'm doing one POJO per event type.


As each event is received, the JSON payload inflates the right object and it’s added to a BlockingQueue():

@Override
public void messageArrived (String topic, MqttMessage message) throws Exception
{
    byte[]  payload = message.getPayload();
    String  jsonMessage = new String( payload );
              
    EventMessage_POJO  anEvent = EventMessage_POJO.fromJSON( jsonMessage );
       
    if (anEvent.isAlarmMessage())
        anEvent = (AlarmMessage_POJO) AlarmMessage_POJO.fromJSON( jsonMessage );
    else if (anEvent.isWPMMessage())
        anEvent = (WPM_POJO) WPM_POJO.fromJSON( jsonMessage );
       

    if (this.eventQueue != null) {
        log.info( "Event received - putting (blocking call) on the queue" );
        this.eventQueue.put( anEvent );
    }
}

As this point, we have Machine Events represented as Java POJO objects sitting in a BlockingQueue(). 



Popping out to the other Java thread:

while (true) {
    //
    // Suck an event off the queue
    EventMessage_POJO anEvent = (EventMessage_POJO) eventQueue.take();
    log.info( "event message arrived. Machine: {}  Topic: {}", 
               anEvent.getMachineID(), anEvent.getTopic()  );
           
    //
    // Let’s try the Synchronous Write approach outlined in the README at
    // https://github.com/influxdata/influxdb-java
    Point   dataPoint = null;
    BatchPoints batchPoints = BatchPoints
.database( databaseName )
                        .tag( "async", "true" )
                        .consistency( ConsistencyLevel.ALL )
                        .build();       

    if (anEvent.isAlarmMessage()) {
        log.info( "Alarm event arrived - building InfluxDB Point object" );
        dataPoint = Point.measurement( "alarms" )
                         .time( System.currentTimeMillis(), TimeUnit.MILLISECONDS )
                         .tag( "machineID", anEvent.machineID )
                         .addField( "CODE",
                                    ((AlarmMessage_POJO) anEvent).getAlarmCode() )
                        .build();



    } else if (anEvent.isWPMMessage()) {
        log.info( "WPM event arrived - building InfluxDB Point object" );
        dataPoint = Point.measurement( "throughput" )
                         .time( System.currentTimeMillis(), TimeUnit.MILLISECONDS )
                         .tag( "machineID", anEvent.getMachineID() )
                         .addField( "WPM", 
                               ((WPM_POJO) anEvent).getEventsPerMinute() )
                         .build();
               
    }  

    //
    // Quick error check and if good, write data point to InfluxDB
    if (dataPoint != null) {
        batchPoints.point( dataPoint );
        influxDB.write( batchPoints );
        log.info(  "Point written to InfluxDB successfully" );
    } else {
        log.error( "Event received but builder failed to build! dataPoint null!" );
    }   
}

That’s pretty simple and will get you up and running quickly.

We do need to cover off on measurements, fields and tags. Three essential concepts in InfluxDB.

That's the next post!




Saturday, March 2, 2019

(#31) Persistence is Futile!


We are the IoT Sensors. Lower your shields and surrender your ships. Persistence is futile.


At least that’s what I think about trying to store IoT data in a database. Or, it’s what I thought!


I believed that the data volume and velocity of a large IoT ecosystem would swamp any data store. Persistence is Futile! Moreso if the target was a relational database.


I have other beefs about relational databases, mostly along their semblance to a Land Line Telephone in the 21st century.

But particularly in an IoT world, a world awash in sensor data.

Because a universal attribute of sensor data is time.

  • “What is the temperature now?”
  • “What was it yesterday?”
  • “What will it be this afternoon?”
While any database can store and manipulate time data, I’ll argue that few treat time as a first-class citizen.


And then I met InfluxDB.

InfluxDB is a product in the emerging field of Time Series Database (TSBD) technology.
InfluxDB is a data store where time is treated as a foundational element. Time is more than just a datatype. A Time Series Database (TSDB) “…is a software system that is optimized for handling time series data, arrays of numbers indexed by time (a datetime or a datetime range)” (TSDB Wikipedia)
Asking time-based questions, getting time-based answers from InfluxDB is easy. It’s what a TSDB does.

Let’s get to the meat and cover how I’ve put an InfluxDB into production in two scenarios. First, I have to call out the ease of installation. I am stunned, amazed and grateful (no Oxford comma) at the installation process!  

On my Ubuntu 18.04 LTS based system, the installation was trivial. No, that’s an overstatement. It was easier than trivial. It was ‘easier than falling off a log.’ A half dozen commands, ending with:
$ sudo apt-get install influxdb
$ sudo systemctl unmask influxdb.service
$ sudo systemctl start influxdb.service
And InfluxDB is up and running.
Waiting to ingest some data.
Boom!


InfluxDB is one of four components bundled in what the vendor calls their TICK stack. The other components I installed (with equal ease) were Telegraf and Chronograf. I skipped the installation of Kapacitor.
  • InfluxDB is the persistence layer for the timeseries data.
  • Telegraf is their tool for putting data into InfluxDB.
  • Chronograf is their visualization tool.

Let's start with Chronograf since that’s Eye Candy. 

Two commands bring Chronograf online:
$ wget https://dl.influxdata.com/chronograf/releases/chronograf.deb
$ sudo dpkg -i chrongraf.deb

Then hop over to http://localhost:8888  (replace localhost with the server address where Chronograf is running.)

Solar Array Dashboard

Here’s my first example of a Chronograf Dashboard. This is a screenshot of data coming from my Solar Charging System setup.

These are real-time charts (i.e. the charts change as the data changes) that are showing a handful of key data items important to a small solar charging system.

I’m far from being an expert Chronograf user, I’m far from being an intermediate user. I’ve spent about 90 minutes with the tool over the last several months.
This dashboard took me 15 minutes to create!


Let’s move down to the next layer in the TICK – ingestion of data. Telegraf is their tool that eases the chore of sending IoT data to InfluxDB.
Installation of Telegraf was, again, stunningly simple:
$ sudo apt-get install telegraf
$ sudo systemctl start telegraf
$ telegraf config > telegraf.conf


The last command creates a default configuration file with, what I think are all ingestion methods disabled. All I had to do was bring that file into an editor and change a few lines:
[[inputs.mqtt_consumer]]
   servers = ["tcp://192.168.2.1:1883"]
   topics = [
      "LS1024B/1/DATA",
      "sensors/#",
   ]
   data_format = "json"


My whole home IoT ecosystem is already based on JSON and MQTT. All of my devices in the house send their data, formatted as JSON payloads to an MQTT broker.
Thus, for me, four steps. Nothing more.
My Solar Panel System devices are flowing their data into InfluxDB!
Total elapsed time?
Maybe 30 minutes.


Crank it up a notch



Now, let’s switch to a slightly more complicated InfluxDB setup that I put into place at work.
For context, at work, we have a few packaging machines that we’ve tapped into for their event stream. These packaging machines spew events, as they package packages. The machine events are formatted as JSON messages and routed to an MQTT broker.
I’ve written other applications in Python and Java to subscribe to the events and “do things” (make business related ‘conclusions’). “Java Application 1” uses Esper, a Complex Event Processing engine to create business-relevant conclusions based on patterns in the event stream.
Skipping the gory details, “Java Application 1” looks for certain patterns that indicate something important has occurred. When a pattern is recognized, the Esper based Java program publishes a new event to the MQTT broker.

The points are:
  • Everything is an event
  • All events are represented as JSON messages and
  • All are flowing to an MQTT broker
“Java Application 2” subscribes to these events are puts them into InfluxDB using the Java client. Why not just use Telegraf again? I wanted hands-on experience with the Java client so I could compare the two.
Recap:
  • The packaging machines publish low-level, JSON formatted, events to MQTT
  • Java App 1 uses Esper to detect patterns in those events and publish new events, also in JSON to MQTT
  • Java App 2 subscribes to all of these events and uses the InfluxDB Java client to store the data


Eye candy for openers:
Packaging Machines Status Dashboard

Again, this is a real-time dashboard that instantly conveys the status and business value of the machines. Throughput, machine issues, and concerns that need to be attended to.
Admittedly I’ve not spent a lot of time with other visualization tools (Tableau, PowerBI, etc…) the ease of creating the charts was surprising. I spent about 40 minutes to create that dashboard.
In subsequent posts I’ll go into more detail, show code, show the JSON and configurations.

There's more to InfluxDB than just eye candy!

Wednesday, June 13, 2018

(#30) Solar Charge Controller Communication


The last post covered off on the hardware and computer connection for creating a communication link with the EPSolar LandStar LS1024B controller.

Here we'll briefly cover off on the basic software approach to communicating with the controller.

As mentioned last time, the communication protocol used by the controller is an archaic one called Modbus.

With our USB/RS485 Adapter connected and plugged into a Linux machine we can start coding up the software.

There are Python libraries for modbus, but I chose to use a 'C' library for initial exploration.  I used libmodbus V3.1.4 from libmodbus.org on Ubuntu 16.04 LTS.

[ November 2018 Update: complete code for my solution is available in my GitHub Repository ]


Modbus and Libmodbus

Fortunately the API exposed by libmodbus is small; there's not a lot to master. And the documentation is good. Unfortunately, the terminology is modbus specific and I've already taken the stance that I have no desire to master an interface protocol for industrial devices from the late 70's.

So rather than dig into why a register isn't a coil and why there are three or four ways to actually read data, it was easier to just throw some code and it and see what works.


The C code


Right off the bat, include the right header files:

#include <errno.h>
#include <modbus/modbus.h>


#define LANDSTAR_1024B_ID       0x01

Then try to open the port to the controller. The settings are 15200 Baud, 8N1. My USB/RS485 adapter shows up as /dev/ttyUSB0:

int main (int argc, char* argv[]) 
{
    modbus_t    *ctx;
    
    puts( "Opening ttyUSB0, 115200 8N1" );
    ctx = modbus_new_rtu( "/dev/ttyUSB0", 115200, 'N', 8, 1 );
    if (ctx == NULL) {
        fprintf( stderr, "Error opening\n" );
        return -1;
    }

    printf( "Setting slave ID to %X\n", LANDSTAR_1024B_ID );

    modbus_set_slave( ctx, LANDSTAR_1024B_ID );
    <...>

If the port opens successfully, then try to connect:

    puts( "Connecting" );
    if (modbus_connect( ctx ) == -1) {
        fprintf( stderr, "Connection failed: %s\n", 
                 modbus_strerror( errno ) );
        modbus_free( ctx );
        return -1;

    }

Once the port is open, you need to consult the vendor documentation to see what 'modbus registers' hold the data we're interested in. 


Here's a snippet that shows how to pull what EPSolar calls the "real time" data:

static 
void    getRealTimeData (modbus_t *ctx)
{
    int         registerAddress = 0x3100;

    int         numBytes = 0x13;  // 0x14 and up gives 
                                  // 'illegal data address'
    uint16_t    buffer[ 32 ];

    // zero everything out to start!    
    memset( buffer, '\0', sizeof buffer );
    
    // call 'read-input-registers', other read calls error
    if (modbus_read_input_registers( ctx, 
            registerAddress, numBytes, buffer ) == -1) {
        fprintf(stderr, "Read failed: %s\n", 
                modbus_strerror( errno ));
        return;
    }
    
    // ---------------------------------------------
    //  Photo Voltaic Values - Volts, Amps and Watts
    float pvArrayVoltage =  ((float) buffer[ 0x00 ]) / 100.0;
    float pvArrayCurrent =  ((float) buffer[ 0x01 ]) / 100.0;
    
    //
    // Assemble the Power (watts) value from two words
    long    temp = buffer[ 0x03 ] << 16;
    temp |= buffer[ 0x02 ];
    float pvArrayPower   =  (float) temp / 100.0;
    <..snip..>


A couple of notes...
There are a handful of 'read' calls in the modbus library. Again, rather that learn modbus details, I just whacked at the code until one of the calls worked.

But the 'read-input-registers' call did not always work. More on this later.




Note the 'register-address' variable that's set to 0x3100. Again, this value comes from the vendor documentation.  And the amount of data read, numBytes, can vary from 1 to MAX (where MAX seems to depend). 

This number of bytes values sometimes fell short of what the documentation said I could expect. For example, there are three additional fields documented:

float batterySOC = ((float) buffer[ 0x1A ]) / 100.0;
float remoteBatteryTemp = ((float) buffer[ 0x1B ]) / 100.0;

float systemRatedVoltage = ((float) buffer[ 0x1D ]) / 100.0;

But it didn't work for my controller - setting numBytes higher that 0x13 gave an error.


I mentioned that 'read-input-registers' didn't always work. For the Settings stored at 0x9000, I had to use the 'read-registers' call.  Looking at little into the Modbus specifics, I can see that one call uses Modbus function call 0x04 and the other uses 0x03.   It's most likely because the settings at 0x9000 are both read and write. 

At some point in the future, I'll try the function write-registers (modbus function 0x10) to see if I can update the settings. 

(#29) And now for something completely different


Tinkering on the Home Automation front has slowed. I've got a number of projects in various states of completion: IKEA Tradfri lighting, ODB2 and CANBUS monitoring, GPS work, etc.  And Solar.  



Since we bought the motorhome, my interest in solar charging of 12V systems has been growing.  A few months ago, I purchased a 100W solar panel, and a simple 10A solar charge controller to get started.

While this approach taught me a few things about solar charging systems at a low entry cost, the charge controller was dumb.  More sophisticated charge controllers, with communication capabilities are out there.

Given my interest in IoT, a smarter controller was a necessity!


The Solar Charge Controller

Beijing Epsolar Technology Co is a popular manufacturer of affordable solar charging products. Their LandStar series of PWM Charge Controllers met my needs of being affordable and having a communication interface. 



Twenty dollars and three weeks later, the LS1024B PWM Solar Charge Controller from EPSolar arrived.



Other models from this vendor used a serial interface. This one uses a protocol called "modbus".  I knew nothing of this protocol before the controller arrived and I still know precious little. And I have little desire to master an archaic protocol, so take this next section with a huge 'grain of salt'.

Apparently you can use the Modbus protocol over a variety of interface specifications, but the LandStar uses Modbus RTU over RS485.  While I have no idea what that really means, I was able to Google up a number of matches to get something cobbled up that works.

USB / RS485 Adapter

The first purchase was a USB/Modbus dongle.  There are dozens available on eBay, Aliexpress, etc. I spent $3.50 for one that looks like this and when plugged into Linux, it registers as "ID 1a86:7523 QinHeng Electronics HL-340 USB-Serial adapter":

Plug the adapter into your Linux machine, check 'dmesg' to ensure the device is recognized.  And you should have a "/dev/ttyUSBx" device appear.

[ ] usb 1-2: new full-speed USB device number 3 using ohci-pci
[ ] usb 1-2: New USB device found, idVendor=1a86, idProduct=7523
[ ] usb 1-2: New USB device strings: Mfr=0, Product=2, SerialNumber=0
[ ] usb 1-2: Product: USB2.0-Serial
[ ] usbcore: registered new interface driver usbserial
[ ] usbcore: registered new interface driver usbserial_generic
[ ] usbserial: USB Serial support registered for generic
[ ] usbcore: registered new interface driver ch341
[ ] usbserial: USB Serial support registered for ch341-uart
[ ] ch341 1-2:1.0: ch341-uart converter detected

[ ] usb 1-2: ch341-uart converter now attached to ttyUSB0


Wiring it all up

Wiring of the device is simple.  Grab any old Ethernet cable (the connection on the LandStar controller is RJ45.)  Cut one of the RJ45s from the cable, strip the wires.  Find the wires that connect to pins 3 and 5 - and trim the insulation.



Wire 5 goes the the connection marked "A" on the adapter. Wire 3 goes to the connection marked "B".  Use a multimeter to ensure continuity and a good connection.  Then plug the adapter back into the Linux machine.

Plug the other end of the Ethernet cable into the COM port on the controller.  Connect the 12V battery to the charge controller and the LandStart will power up.



[ Ignore the DC-DC buck converter in the photo as I use this setup to power a Raspberry Pi. ]


With the hardware wired and connected, we can move on to the software. Which is discussed in the next post!