Wednesday, July 30, 2014

(#14) Esper Example Continued - The Event Pojo 

The more time I spend with Esper, the more impressed I am at the low barrier to entry. In other words, getting a CEP platform up and running is easy.  The previous post displayed the code I came up with that instantiates the Esper CEP engine.

The second line of that code:

 cepConfig.addEventType( "HHBStatusEvent", HHBStatusEvent.class.getName() );


shows us informing Esper about the class we'll create to represent an event, HHBStatusEvent.

Now, where's more than one way to represent an event, from your stream, to Esper but I chose what I felt was the most simple, a Plain Old Java Object (POJO).

Without further ado,

HHBStatusEvent.java


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author patrick.conroy
 */
public class HHBStatusEvent 
{
    java.util.Calendar  dateTime;                   // date/time event was created
    int                 deviceType;                 // HHB device type (eg. 23 = Motion Sensor)
    String              deviceTypeString;           // String of type (eg. "MOTION SENSOR")
    String              deviceName;                 // Sensor name (eg. "FRONT DOOR")
    String              deviceStatus;               // Sensor state (eg. "OPEN", "CLOSED", "MOTION", "NO MOTION")
    int                 statusDuration;             // How long (seconds) the sensor has been in this state
    String              alarmOnSetting1;            // See Note 1
    String              alarmOnSetting2;            // See Note 1
    String              callOnSetting1;             // See Note 2
    String              callOnSetting2;             // See Note 2
    boolean             deviceOnline;               // True or False
    boolean             batteryOK;                  // False if low or empty
    boolean             triggered;                  // True if sensor is in ALARM state
    String              macAddress;                 // MAC Address of this sensor
    
    // ------------
    // Note 1:  The HHB System can be set to 'alarm' the Key FOB based on the sensor type and state/
    //  Eg. You can configure the System to alarm when a Door Sensor is 'OPEN' and you could 
    //  configure the System to alarm when a Door Sensor is 'CLOSED'.  A motion sensor could alarm
    //  on 'MOTION', a Tilt Sensor could alarm on 'TILT' and a power sensor could alarm on 'OFF'
    //  alarmSetting1 will be similar to "ALARM ON OPEN" and alarmSetting2 could be "NO ALARM ON CLOSED"
    //
    // Note 2: The system had (past tense) the capability to do the above but call into the Eaton
    //  servers and, in turn, call you or send a text message.  So everything said above if copied
    //  to these attributes, just replace 'ALARM' with 'CALL'.  For example the Tilt Sensor could be
    //  "CALL ON NO-TILT" for callOnSetting1 and "NO CALL ON TILT" for callOnSetting2
    
    
    //--------------------------------------------------------------------------
    @Override
    public  String  toString()
    {
        return "HHBStatusEvent(" +
                getDateTimeString() + "|" +
                this.deviceTypeString + "|" +
                this.deviceName + "|" +
                this.deviceStatus + "|" +
                this.statusDuration + "|" +
                this.triggered + "|" +
                this.macAddress + 
                ")";
    }
    
    //--------------------------------------------------------------------------
    public String   getDateTimeString() 
    {
        SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss z" );
        return sdf.format( this.dateTime.getTime() );
    }
    
    //--------------------------------------------------------------------------
    public  void    fromEventString (String eventString)
    {
        /*
        * Here are some some examples
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 03 | OPEN-CLOSE SENSOR | Front Door | CLOSED | 14400 | ALARM ON OPEN | NO ALARM ON CLOSE | DO NOT CALL ON OPEN | DO NOT CALL ON CLOSE | ONLINE | BATTERY OK | CLEARED | 0000010228 |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 05 | WATER LEAK SENSOR | Basement Floor | DRY | 950400 | ALARM ON WET | NO ALARM ON DRY | DO NOT CALL ON WET | DO NOT CALL ON DRY | ONLINE | BATTERY OK | CLEARED | 0000011957 |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 23 | MOTION SENSOR | Motion Sensor | NO MOTION | 3600 | ALARM ON MOTION | NO ALARM ON NO MOTION | DO NOT CALL ON MOTION | DO NOT CALL ON NO MOTION | ONLINE | BATTERY OK | CLEARED | 000007AAAF |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 24 | TILT SENSOR | Garage Door Sensor | CLOSED | 3600 | ALARM ON OPEN | NO ALARM ON CLOSE | DO NOT CALL ON OPEN | DO NOT CALL ON CLOSE | ONLINE | BATTERY OK | CLEARED | 000000B357 |        
        */
        String regexDelimiters = "\\|";
        String[] tokens = eventString.split( regexDelimiters );
        
        String  topic           = tokens[ 0 ];
        this.dateTime           = createDateTime( tokens[ 1 ].trim() );
        this.deviceType         = Integer.parseInt( tokens[ 2 ].trim() );
        this.deviceTypeString   = tokens[ 3 ].trim();
        this.deviceName         = tokens[ 4 ].trim();
        this.deviceStatus       = tokens[ 5 ].trim();
        this.statusDuration     = Integer.parseInt( tokens[ 6 ].trim() );
        this.alarmOnSetting1    = tokens[ 7 ].trim();
        this.alarmOnSetting2    = tokens[ 8 ].trim();
        this.callOnSetting1     = tokens[ 9 ].trim();
        this.callOnSetting2     = tokens[ 10 ].trim();
        this.deviceOnline       = (tokens[ 11 ].trim().equalsIgnoreCase( "ONLINE" ) );
        this.batteryOK          = (tokens[ 12 ].trim().equalsIgnoreCase( "BATTERY OK" ) );
        this.triggered          = (tokens[ 13 ].trim().equalsIgnoreCase( "TRIGGERED" ) );
        this.macAddress         = tokens[ 14 ].trim();
    }
    
    //--------------------------------------------------------------------------
    private java.util.Calendar  createDateTime (String dateTimeStr)
    {
        Calendar    aCal = Calendar.getInstance();
        SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss z" );
        try {
            aCal.setTime( sdf.parse( dateTimeStr ) );
        } catch (ParseException ex) {
            //
            //  In the event of an error - lets just set the event time to "now"
            aCal.setTimeInMillis( System.currentTimeMillis() );
        }
        
        return aCal;
    }
    
    //--------------------------------------------------------------------------
    public Calendar getDateTime() {
        return dateTime;
    }

    public void setDateTime (Calendar dateTime) {
        this.dateTime = dateTime;
    }

    public int getDeviceType() {
        return deviceType;
    }

    public void setDeviceType (int deviceType) {
        this.deviceType = deviceType;
    }

    public String getDeviceTypeString() {
        return deviceTypeString;
    }

    public void setDeviceTypeString (String deviceTypeString) {
        this.deviceTypeString = deviceTypeString;
    }

    public String getDeviceName() {
        return deviceName;
    }

    public void setDeviceName(String deviceName) {
        this.deviceName = deviceName;
    }

    public String getDeviceStatus() {
        return deviceStatus;
    }

    public void setDeviceStatus(String deviceStatus) {
        this.deviceStatus = deviceStatus;
    }

    public int getStatusDuration() {
        return statusDuration;
    }

    public void setStatusDuration(int statusDuration) {
        this.statusDuration = statusDuration;
    }

    public String getAlarmOnSetting1() {
        return alarmOnSetting1;
    }

    public void setAlarmOnSetting1(String alarmOnSetting1) {
        this.alarmOnSetting1 = alarmOnSetting1;
    }

    public String getAlarmOnSetting2() {
        return alarmOnSetting2;
    }

    public void setAlarmOnSetting2(String alarmOnSetting2) {
        this.alarmOnSetting2 = alarmOnSetting2;
    }

    public String getCallOnSetting1() {
        return callOnSetting1;
    }

    public void setCallOnSetting1(String callOnSetting1) {
        this.callOnSetting1 = callOnSetting1;
    }

    public String getCallOnSetting2() {
        return callOnSetting2;
    }

    public void setCallOnSetting2(String callOnSetting2) {
        this.callOnSetting2 = callOnSetting2;
    }

    public boolean isDeviceOnline() {
        return deviceOnline;
    }

    public void setDeviceOnline(boolean deviceOnline) {
        this.deviceOnline = deviceOnline;
    }

    public boolean isBatteryOK() {
        return batteryOK;
    }

    public void setBatteryOK(boolean batteryOK) {
        this.batteryOK = batteryOK;
    }

    public boolean isTriggered() {
        return triggered;
    }

    public void setTriggered(boolean triggered) {
        this.triggered = triggered;
    }
    
    public String getMacAddress() {
        return macAddress;
    }

    public void setmacAddress(String macAddress) {
        this.macAddress = macAddress;
    }


}

Digging In

Starting right at the top of the class, there's a direct mapping between a field in the MQTT message that represent an event and an attribute in this class.

Here's the first few fields from an HHB/STATUS event from MQTT:

HHB/STATUS | 2014-05-15 11:31:50 -0600 | 03 | OPEN-CLOSE SENSOR | Front Door | CLOSED | 14400 | <snip>


And the first few attributes in the Java class:

public class HHBStatusEvent 
{
    java.util.Calendar  dateTime;             // date/time event was created
    int                 deviceType;           // HHB device type (eg. 23 = Motion Sensor)
    String              deviceTypeString;     // String of type (eg. "MOTION SENSOR")
    String              deviceName;           // Sensor name (eg. "FRONT DOOR")
    String              deviceStatus;         // Sensor state (eg. "OPEN", "CLOSED", "MOTION", "NO MOTION")
    int                 statusDuration;       // How long (seconds) the sensor has been in this state
    <snip!>

   
Only the getter and setter methods are required. The toString() and createDateTime() methods are there to help me out.

The other not-essential-to-Esper method, fromEventString() is an easy way for me to create an instance of an HHBStatusEvent object from the payload of an MQTT message.

That's enough for now.
We'll look at the Listener class next. This class has the method that Esper will call when it finds a match for your events in the event stream.


Wednesday, July 2, 2014

(#13) Esper Code - Esper's Main 

Make it work, make it work right, make it work fast -- in that order.  Here's the code that's running two simple rules.  As mentioned earlier, the Esper documentation lacks on step-by-step directions. So this post will try to remedy that.

I can add additional detail if requested, but I'll skip over the basics of downloading Esper, installing the JARs, and creating the libraries and dependencies for NetBeans (v7.4) and instead jump right into the code:


static void main (String args[])

Here's my main(), so far:


public static void main(String[] args) 
{
    //The Configuration is meant only as an initialization-time object.
    Configuration cepConfig = new Configuration();
    cepConfig.addEventType( "HHBStatusEvent", HHBStatusEvent.class.getName() );

    // We setup the engine
    EPServiceProvider cep = EPServiceProviderManager.getProvider( "myCEPEngine", cepConfig );
    EPRuntime cepRT = cep.getEPRuntime();


    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone leaving
    String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

    EPAdministrator cepAdm = cep.getEPAdministrator();
    EPStatement departureStatement = cepAdm.createEPL( anEPLQuery );

    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone arriving
    anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) " +
            " -> eventTwo = HHBStatusEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) " +
            "-> eventThree = HHBStatusEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' ) ].win:time( 10 minutes )";
    EPStatement arrivalStatement = cepAdm.createEPL( anEPLQuery );



    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://192.168.1.11:1883" );
    mqttClient.setClientID( "CEP Engine 1" );

    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    departureEventListener.setTheMQTTClient( mqttClient );
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    arrivalEventListener.setTheMQTTClient( mqttClient );
    arrivalStatement.addListener( arrivalEventListener );


    // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (true)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   

Bear in mind, my Esper experience totals up, as of this writing, at 90 minutes. So take these comments as "directional" and an attempt to be helpful. Your mileage may vary.

The first four lines are straight from the examples. Create a Configuration object, then register the POJO event class and save off a reference to the CEP Runtime object. In order to insert MQTT events into the stream, we'll need to invoke a method off the CEP Runtime object. 

The EPL Queries to Detect the Event Patterns

The next three lines, from anEPLQuery to departureStatement, create the EPL query for an event trigger.

String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

Consult the Esper documentation but this is the syntax for triggering on three events in a specific sequence within a 5 minute time window.

Recall each Eaton Home Heartbeat sensor is assigned a unique (Zibgee) MAC address. Sensor '0x12467' is the back door, sensor '0x0B357' is the garage door.

We've talked about it earlier, but this statement says "watch for a door (12647) open event followed by a garage door open event followed by a garage door closed event -- all three occurring within a 5 minute window.

(The next lines work to create a similar EPLQuery for an arrival event, but with an extra sensor state tossed in and a 10 minute window.)

Setup the MQTT Client Specifics 
Slide down to:
    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://10.1.1.17:1883" );
    mqttClient.setClientID( "CEP Engine 1" );


And were setting up the Paho MQTT routines. My MQTT Broker is on  a server at 10.1.1.17, listening on the default port of 1883.  That brings us to:


    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!


My HHBStatusEventGenerator class generates HHBStatusEvents.  The code will be posted but, no surprise, this class subscribes to MQTT HHB/STATUS events and then calls Esper's cepRT.sendEvent() method.


Do This When the Event Pattern is Detected
And that's how Esper will start streaming in events.  But not quite yet. The lines:

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    <...>
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    <...>
    arrivalStatement.addListener( arrivalEventListener );


create and register Event Listeners with Esper.  A method in each listener class will be called with the event pattern is detected.  

Do Forever
At this point, the setup is complete and we can tell MQTT that we're ready to start subscribing to the event stream.

 // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (!hellIsFrozenOver)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   


That's about it.  At this point, conceptually:
  • When an MQTT HHB/STATUS event arrives, it's payload is extracted and a corresponding HHBStatusEvent object is instantiated
  • This HHBStatusEvent object is added to Esper's event stream
  • Esper watches the event stream for a pattern match against the 2 EPL statements (one arrival pattern, one departure pattern)
  • If a pattern match is detected, a method in the matching Listener object is invoked
  • At this point, it's up to us to decide what action to take on a pattern match



GOSUB 200 - The FTDI Device Driver 

I apologize for the interruption, but after spending a few hours on the 'net looking for a solution, I need to put down the information I've found in a spot where I can find it.  And that's right here.


If it's not broken...
With kernel 3.12, the FTDI device driver was changed to remove the ability to pass in a vendor and product ID.  The Eaton Home Heartbeat system has an FTDI USB-Serial chip embedded in it, with a product ID that's unknown to the current device driver.  

Earlier device driver releases let you overcome that by passing in the two new IDs:

root@pi# modprobe ftdi_sio vendor=0x0403 product=0xde29


That doesn't work anymore.  The correct way, as of this writing in July 2014, is to do the following. With device unplugged, go root, then:

root@pi# modprobe ftdi_sio
root@pi# echo 0403 DE29 > /sys/bus/usb-serial/drivers/ftdi_sio/new_id

These two command should execute without error and return immediately. If they do not, something else is amiss.

Try these manually at first to gain confidence that things are working.  Once you're sure, you can automate the driver configuration at boot time.



(Boot. Das Boot. Go SUB. Submarine, Das Boot. Get it? Huh???  Right???


I tell you, it all hangs together. I know at times it seems like a huge stretch, but I want you to know I take this whole 'blogging' stuff seriously!


If I didn't, I wouldn't kill hours searching for a movie poster about an obscure film with subtitles.)


Loading the FTDI Device Driver at (Das) Boot Time

You need two new files, a shell script and a udev rules file.  Create this shell script (owned by root:root) and plunk it in /etc (name it what you wish. Mine's named "hhb_ftdi.sh"

#!/bin/sh
/sbin/modprobe -r ftdi_sio
/sbin/modprobe ftdi_sio
echo 0403 de29 > /sys/bus/usb-serial/drivers/ftdi_sio/new_id




Now move into /etc/udev/rules.d and create a rules file. Mine's called "95-hhb_ftdi.rules"

# Rules for hotplugging Eaton Home Heartbest with FTDI (0x0403 0xDE29) Serial Adapter
SUBSYSTEM=="usb", ATTR{idVendor}=="0403", ATTR{idProduct}=="de29", RUN="/etc/hhb_ftdi.sh"
#
# These may be necessary to setup the correct permissions
KERNEL=="ttyUSB0", SYMLINK="hhb", GROUP="dialout", MODE="0660"
# KERNEL=="ttyUSB1", GROUP="dialout", MODE="0660"


The name of the rules file isn't too important. Just make sure it ends with ".rules".

Note the following - use "ATTR{idVendor}" not "SYSFS{idVendor}" as you'll find it out on the 'net.  ATTR has replaced SYSFS.  You'll get an error in /var/log/daemon.log stating "SYSFS{iDVendor} unknown key" or something to that effect.

Watch your "==" placement. Don't use the assignment "=" operator.  Check your spelling!

Note that this rule fires off the shell script.  Make sure the names match!  Note that I create a symbolic link, "/dev/hhb" to "/dev/ttyUSB0".  You don't have to do that if you don't want to.  In fact, start with the KERNEL line commented back out and see what you get.

Debugging udev rules is a known challenge.  Scan the kernel, syslog and daemon.log in /var/log for errors if things aren't working.

If all goes well, you're Eaton Home Heartbeat device will be ready and waiting for you on either port "/dev/ttyUSB0" or "/dev/hhb" when you reboot.