Sunday, October 12, 2014

(#16) Writing the Wrongs - Esper Patterns 

I made a mistake.  After watching my Esper code run for several weeks I was consistently seeing triggers on event patterns that I didn't expect.  It's not that the triggers were wrong -- I'm not asserting a bug in Esper -- it's that I wasn't seeing results that I expected to see.


The ABC's 

Let's simplify what I'm after. If you recall, I'm looking for a Garage Door Open event, a Door Open event and then a Motion event as a pattern that I'm interested in.  

Simplify this to event A, event B then event C, where event A is the Garage Door Open Event, event B is the Door Open event and event C is the motion event.

I'm interested in A, then B, then C.  In Esper's EPL parlance, this is noted as "A -> B -> C".  Which is useful notation to adopt.


The Real World Intrudes

What was happening, in my house, occasionally was this:

  1. Garage Door Opens (e.g. A-1)
  2. Garage Door closes (don't care about this event)
  3. Time passes
  4. Garage Door Opens again (e.g. A-2)
  5. Door Opens (B-1)
  6. Motion Sensor triggers (C-1)
In my psuedo EPL notation I was seeing A1 -> A2 -> B1 -> C1.

What was I expecting?  I was expecting that my Esper would trigger on the last three: A2 -> B1 -> C1.   And that's not what I was seeing.  In my update listener code, I saw that the three events that were coming in were A1, B1, and C1.  I was getting the first A event, not the last as I had thought I'd see.

The Fault, Dear Brutus

I've just finished up an hour of playing with Esper and patterns. As you'd surmise, the results I'm getting are because of the EPL I used.  So let's explore a bit more on the EPL varations I tried and the results I got.


The test cases I used were all variations on the arrivals of A, B and C events with delays in between.  Test case 3, to pick one, is noted as: "A1, A2, d10 A3, B1, C1".



In English this would be: 
  • send A event (A1)
  • send A event (A2)
  • Delay 10 seconds
  • send A event (A3)
  • send B event (B1)
  • send C event (C1)

I created 4 Esper patterns in EPL and ran the application.

EPL-1
SELECT * FROM PATTERN
[ every 
 (eventOne = HHBAlarmEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) 
-> eventTwo = HHBAlarmEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) 
-> eventThree = HHBAlarmEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' )) where timer:within( 2 minutes )];

EPL-1 can be noted in my shorthand as [ every ( A-> B -> C) where timer:within ]


EPL-2
SELECT * FROM PATTERN
(eventOne = HHBAlarmEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) 
-> eventTwo = HHBAlarmEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) 
-> eventThree = HHBAlarmEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' )) where timer:within( 2 minutes )];

EPL-2 can be noted in my shorthand as [  ( A-> B -> C) where timer:within ]



EPL-3
SELECT * FROM PATTERN

( every eventOne = HHBAlarmEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) 
-> eventTwo = HHBAlarmEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) 
-> eventThree = HHBAlarmEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' )) where timer:within( 2 minutes )];



EPL-3 can be noted in my shorthand as [  (  every A-> B -> C) where timer:within ]



EPL-4
SELECT * FROM PATTERN

every eventOne = HHBAlarmEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) 
-> eventTwo = HHBAlarmEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) 
-> eventThree = HHBAlarmEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' ) where timer:within( 2 minutes )];

EPL-1 can be noted in my shorthand as [ every A-> B -> C where timer:within ]


You can see I'm moving the "every" and the parenthesis around and see what results I find.


The Test Cases

With the four EPL patterns ready, I modified the code to read patterns from a file, to add real-world timings. Then I created six test cases:



(Recall "d10" is my short hand for delay (wait) 10 seconds, d123 means delay 123 seconds.)

All four EPL patterns will be run against the test cases.  Next, I thought about the results that I wanted.  For example on test case 3, what I'd like to get is a trigger on the event sequence A3 -> B1 -> C1.

So I added a column to indicate what I was hoping to see from Esper:



Now, we could certainly disagree over what's to be expected or desired. Your needs / expectations could be different from mine.  For example in test case 5, you might possibly want to get notified on A1->B1-> C1 or A1->B3-> C1 or A3->B3-> C3.  What you expect is up to you.  I've just put down what I think I'd want to meet my needs.

The Results - Grouped by EPL

Test Case 1

Recall EPL-1 is [ every ( A-> B -> C) where timer:within ]. Let's look at the results:



Green denotes that I got what I was wanting to get.  Red means that I got something that I didn't want. Please, please, please note - red does not mean the results are wrong. Red means that the EPL I used didn't give me the results I was hoping for.

Think about Test Case 1, where I was hoping to get A3->B1->C1 and instead I got triggered on A1->B1->C1.  It's obviously correct but it's also reasonable to get that response from Esper.  So the chore becomes "what EPL will produce the results I'm after?"

Let me beat this horse a bit more - Esper responded. The EPL worked, my updateListerner method was called.  But when I examined the three events that triggered the listener object, sometimes I got the event objects I wanted (green) and sometimes I did not (red).

Let's keep going


Test Case 2

Recall EPL-2 removes the "every" keyword: [ ( A-> B -> C) where timer:within ]. The results were:




Again - green the results match what I expected. The red, I got something other than what I had wanted and now, yellow, means the trigger did not fire. Yellow means the updateListener method was not called.


The Yellow results on Test Case 5 puzzle me.  Yes, the time delta between A1 and C1 is outside the window. No trigger should fire.  But A3, B3 and all of the C events are within the two minute window and best I can tell, the updateListener did not get called.  That strikes me as odd.


Test Case 3

Recall EPL-3 puts the "every" keyword back but inside the parenthesis: [  (  every A-> B -> C) where timer:within ]The results were:


Interesting,  More cases where there was no trigger fired.  Again not what I was expecting.

Test Case 4

Finally EPL-4 removes the parenthesis: [  every A-> B -> C where timer:within ]The results were:






Conclusions

First I need to read more and get a better understanding of the EPL pattern syntax to see if my errors are obvious.  

The Community Responds!  Switch your EPL to use The Match Recognize Syntax!

Monday, August 4, 2014

(#15) Esper Example Continued - the Event Listener 


We've outlined the Esper CEP main code in post #13, and in post #14, we posted the Java code that backs up an event.  Here we'll cover off on the listener code -- the code that gets called when your event rule triggers.

Recall that we're looking for Departure Events and Arrival Events. Each occur when three other events happen in a specific sequence in a specific amount of time.

Here's the code for the Departure Event Listener:

public class DepartureEventListener implements UpdateListener 
{
    // Esper's using Log4J v1.2.17 - we might as well too
    private final org.apache.log4j.Logger logger = 
                 org.apache.log4j.Logger.getLogger( "DepartureEventListener" );

    private MQTTClient      theMQTTClient = null;
  
    
    // -------------------------------------------------------------------------
    @Override
    public void update(EventBean[] newData, EventBean[] oldData) 

    {
        logger.info( "Departure Event Listener - update");

        //
        // Note the names align to the EPL statement.  We called them 'eventOne'
        //  'eventTwo' and 'eventThree' in the EPL
        HHBStatusEvent  eventOne = (HHBStatusEvent) newData[ 0 ].get( "eventOne" );
        HHBStatusEvent  eventTwo = (HHBStatusEvent) newData[ 0 ].get( "eventTwo" );
        HHBStatusEvent  eventThree = (HHBStatusEvent) newData[ 0 ].get( "eventThree" );

        //
        // Calculate time -- in seconds -- between the three events
        long deltaTime1 = (eventTwo.getDateTime().getTimeInMillis() - 
                           eventOne.getDateTime().getTimeInMillis()) / 1000L;

        long deltaTime2 = (eventThree.getDateTime().getTimeInMillis() -
                           eventTwo.getDateTime().getTimeInMillis()) / 1000L;

        long deltaTime3 = (eventThree.getDateTime().getTimeInMillis() -
                           eventOne.getDateTime().getTimeInMillis()) / 1000L;

        //
        // Create the message payload...
        //  CEP/CONCLUSION | 2014-05-28 15:24:23 -0600 | DEPARTURE | EPL1 | E1:E2 10 | E2:E3 88 | E1:E3 98 |

        StringBuffer    sb = new StringBuffer();
        sb.append( "CEP/CONCLUSION" );        sb.append( " | " );
        sb.append( getCurrentDateTime() );    sb.append( " | " );
        sb.append( "DEPARTURE" );             sb.append( " | " );
        sb.append( "EPL1" );                  sb.append( " | " );
        sb.append( "E1:E2 " );        sb.append( deltaTime1 );            sb.append( " | " );
        sb.append( "E2:E3 " );        sb.append( deltaTime2 );            sb.append( " | " );
        sb.append( "E1:E3 " );        sb.append( deltaTime3 );            sb.append( " | " );
            
        logger.info( "Departure Event Listener - ready to call publish!" );       
        this.theMQTTClient.publishMessage( "CEP/CONCLUSION", sb.toString() );
        logger.info( "Departure Event Listener - publish called!" );       
    }

    // -------------------------------------------------------------------------
    public  void    setTheMQTTClient (MQTTClient theClient)
    {
        this.theMQTTClient = theClient;
    }
    
    //--------------------------------------------------------------------------
    private static  String  getCurrentDateTime()
    {
        Date now = new Date();
        SimpleDateFormat format = new SimpleDateFormat( "YYYY-MM-dd HH:mm:ss Z");
        
        return format.format( now );
    }
}

Things to note:
  1. The class implements the UpdateListener Interface
  2. There's one abstract method to implement, update()
  3. The update() method receives the events that matched the rule
  4. And you can do whatever you want to do when the rule triggers


Reacting to a Departure Event
In my case, I simply create another MQTT message and publish an event. The presumption is that there's another process, elsewhere on the network that will subscribe to these events and that the proper action.

It's very useful to have the events that matched and triggered the conclusion passed in. 


Wednesday, July 30, 2014

(#14) Esper Example Continued - The Event Pojo 

The more time I spend with Esper, the more impressed I am at the low barrier to entry. In other words, getting a CEP platform up and running is easy.  The previous post displayed the code I came up with that instantiates the Esper CEP engine.

The second line of that code:

 cepConfig.addEventType( "HHBStatusEvent", HHBStatusEvent.class.getName() );


shows us informing Esper about the class we'll create to represent an event, HHBStatusEvent.

Now, where's more than one way to represent an event, from your stream, to Esper but I chose what I felt was the most simple, a Plain Old Java Object (POJO).

Without further ado,

HHBStatusEvent.java


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author patrick.conroy
 */
public class HHBStatusEvent 
{
    java.util.Calendar  dateTime;                   // date/time event was created
    int                 deviceType;                 // HHB device type (eg. 23 = Motion Sensor)
    String              deviceTypeString;           // String of type (eg. "MOTION SENSOR")
    String              deviceName;                 // Sensor name (eg. "FRONT DOOR")
    String              deviceStatus;               // Sensor state (eg. "OPEN", "CLOSED", "MOTION", "NO MOTION")
    int                 statusDuration;             // How long (seconds) the sensor has been in this state
    String              alarmOnSetting1;            // See Note 1
    String              alarmOnSetting2;            // See Note 1
    String              callOnSetting1;             // See Note 2
    String              callOnSetting2;             // See Note 2
    boolean             deviceOnline;               // True or False
    boolean             batteryOK;                  // False if low or empty
    boolean             triggered;                  // True if sensor is in ALARM state
    String              macAddress;                 // MAC Address of this sensor
    
    // ------------
    // Note 1:  The HHB System can be set to 'alarm' the Key FOB based on the sensor type and state/
    //  Eg. You can configure the System to alarm when a Door Sensor is 'OPEN' and you could 
    //  configure the System to alarm when a Door Sensor is 'CLOSED'.  A motion sensor could alarm
    //  on 'MOTION', a Tilt Sensor could alarm on 'TILT' and a power sensor could alarm on 'OFF'
    //  alarmSetting1 will be similar to "ALARM ON OPEN" and alarmSetting2 could be "NO ALARM ON CLOSED"
    //
    // Note 2: The system had (past tense) the capability to do the above but call into the Eaton
    //  servers and, in turn, call you or send a text message.  So everything said above if copied
    //  to these attributes, just replace 'ALARM' with 'CALL'.  For example the Tilt Sensor could be
    //  "CALL ON NO-TILT" for callOnSetting1 and "NO CALL ON TILT" for callOnSetting2
    
    
    //--------------------------------------------------------------------------
    @Override
    public  String  toString()
    {
        return "HHBStatusEvent(" +
                getDateTimeString() + "|" +
                this.deviceTypeString + "|" +
                this.deviceName + "|" +
                this.deviceStatus + "|" +
                this.statusDuration + "|" +
                this.triggered + "|" +
                this.macAddress + 
                ")";
    }
    
    //--------------------------------------------------------------------------
    public String   getDateTimeString() 
    {
        SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss z" );
        return sdf.format( this.dateTime.getTime() );
    }
    
    //--------------------------------------------------------------------------
    public  void    fromEventString (String eventString)
    {
        /*
        * Here are some some examples
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 03 | OPEN-CLOSE SENSOR | Front Door | CLOSED | 14400 | ALARM ON OPEN | NO ALARM ON CLOSE | DO NOT CALL ON OPEN | DO NOT CALL ON CLOSE | ONLINE | BATTERY OK | CLEARED | 0000010228 |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 05 | WATER LEAK SENSOR | Basement Floor | DRY | 950400 | ALARM ON WET | NO ALARM ON DRY | DO NOT CALL ON WET | DO NOT CALL ON DRY | ONLINE | BATTERY OK | CLEARED | 0000011957 |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 23 | MOTION SENSOR | Motion Sensor | NO MOTION | 3600 | ALARM ON MOTION | NO ALARM ON NO MOTION | DO NOT CALL ON MOTION | DO NOT CALL ON NO MOTION | ONLINE | BATTERY OK | CLEARED | 000007AAAF |
HHB/STATUS | 2014-05-15 11:31:50 -0600 | 24 | TILT SENSOR | Garage Door Sensor | CLOSED | 3600 | ALARM ON OPEN | NO ALARM ON CLOSE | DO NOT CALL ON OPEN | DO NOT CALL ON CLOSE | ONLINE | BATTERY OK | CLEARED | 000000B357 |        
        */
        String regexDelimiters = "\\|";
        String[] tokens = eventString.split( regexDelimiters );
        
        String  topic           = tokens[ 0 ];
        this.dateTime           = createDateTime( tokens[ 1 ].trim() );
        this.deviceType         = Integer.parseInt( tokens[ 2 ].trim() );
        this.deviceTypeString   = tokens[ 3 ].trim();
        this.deviceName         = tokens[ 4 ].trim();
        this.deviceStatus       = tokens[ 5 ].trim();
        this.statusDuration     = Integer.parseInt( tokens[ 6 ].trim() );
        this.alarmOnSetting1    = tokens[ 7 ].trim();
        this.alarmOnSetting2    = tokens[ 8 ].trim();
        this.callOnSetting1     = tokens[ 9 ].trim();
        this.callOnSetting2     = tokens[ 10 ].trim();
        this.deviceOnline       = (tokens[ 11 ].trim().equalsIgnoreCase( "ONLINE" ) );
        this.batteryOK          = (tokens[ 12 ].trim().equalsIgnoreCase( "BATTERY OK" ) );
        this.triggered          = (tokens[ 13 ].trim().equalsIgnoreCase( "TRIGGERED" ) );
        this.macAddress         = tokens[ 14 ].trim();
    }
    
    //--------------------------------------------------------------------------
    private java.util.Calendar  createDateTime (String dateTimeStr)
    {
        Calendar    aCal = Calendar.getInstance();
        SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss z" );
        try {
            aCal.setTime( sdf.parse( dateTimeStr ) );
        } catch (ParseException ex) {
            //
            //  In the event of an error - lets just set the event time to "now"
            aCal.setTimeInMillis( System.currentTimeMillis() );
        }
        
        return aCal;
    }
    
    //--------------------------------------------------------------------------
    public Calendar getDateTime() {
        return dateTime;
    }

    public void setDateTime (Calendar dateTime) {
        this.dateTime = dateTime;
    }

    public int getDeviceType() {
        return deviceType;
    }

    public void setDeviceType (int deviceType) {
        this.deviceType = deviceType;
    }

    public String getDeviceTypeString() {
        return deviceTypeString;
    }

    public void setDeviceTypeString (String deviceTypeString) {
        this.deviceTypeString = deviceTypeString;
    }

    public String getDeviceName() {
        return deviceName;
    }

    public void setDeviceName(String deviceName) {
        this.deviceName = deviceName;
    }

    public String getDeviceStatus() {
        return deviceStatus;
    }

    public void setDeviceStatus(String deviceStatus) {
        this.deviceStatus = deviceStatus;
    }

    public int getStatusDuration() {
        return statusDuration;
    }

    public void setStatusDuration(int statusDuration) {
        this.statusDuration = statusDuration;
    }

    public String getAlarmOnSetting1() {
        return alarmOnSetting1;
    }

    public void setAlarmOnSetting1(String alarmOnSetting1) {
        this.alarmOnSetting1 = alarmOnSetting1;
    }

    public String getAlarmOnSetting2() {
        return alarmOnSetting2;
    }

    public void setAlarmOnSetting2(String alarmOnSetting2) {
        this.alarmOnSetting2 = alarmOnSetting2;
    }

    public String getCallOnSetting1() {
        return callOnSetting1;
    }

    public void setCallOnSetting1(String callOnSetting1) {
        this.callOnSetting1 = callOnSetting1;
    }

    public String getCallOnSetting2() {
        return callOnSetting2;
    }

    public void setCallOnSetting2(String callOnSetting2) {
        this.callOnSetting2 = callOnSetting2;
    }

    public boolean isDeviceOnline() {
        return deviceOnline;
    }

    public void setDeviceOnline(boolean deviceOnline) {
        this.deviceOnline = deviceOnline;
    }

    public boolean isBatteryOK() {
        return batteryOK;
    }

    public void setBatteryOK(boolean batteryOK) {
        this.batteryOK = batteryOK;
    }

    public boolean isTriggered() {
        return triggered;
    }

    public void setTriggered(boolean triggered) {
        this.triggered = triggered;
    }
    
    public String getMacAddress() {
        return macAddress;
    }

    public void setmacAddress(String macAddress) {
        this.macAddress = macAddress;
    }


}

Digging In

Starting right at the top of the class, there's a direct mapping between a field in the MQTT message that represent an event and an attribute in this class.

Here's the first few fields from an HHB/STATUS event from MQTT:

HHB/STATUS | 2014-05-15 11:31:50 -0600 | 03 | OPEN-CLOSE SENSOR | Front Door | CLOSED | 14400 | <snip>


And the first few attributes in the Java class:

public class HHBStatusEvent 
{
    java.util.Calendar  dateTime;             // date/time event was created
    int                 deviceType;           // HHB device type (eg. 23 = Motion Sensor)
    String              deviceTypeString;     // String of type (eg. "MOTION SENSOR")
    String              deviceName;           // Sensor name (eg. "FRONT DOOR")
    String              deviceStatus;         // Sensor state (eg. "OPEN", "CLOSED", "MOTION", "NO MOTION")
    int                 statusDuration;       // How long (seconds) the sensor has been in this state
    <snip!>

   
Only the getter and setter methods are required. The toString() and createDateTime() methods are there to help me out.

The other not-essential-to-Esper method, fromEventString() is an easy way for me to create an instance of an HHBStatusEvent object from the payload of an MQTT message.

That's enough for now.
We'll look at the Listener class next. This class has the method that Esper will call when it finds a match for your events in the event stream.


Wednesday, July 2, 2014

(#13) Esper Code - Esper's Main 

Make it work, make it work right, make it work fast -- in that order.  Here's the code that's running two simple rules.  As mentioned earlier, the Esper documentation lacks on step-by-step directions. So this post will try to remedy that.

I can add additional detail if requested, but I'll skip over the basics of downloading Esper, installing the JARs, and creating the libraries and dependencies for NetBeans (v7.4) and instead jump right into the code:


static void main (String args[])

Here's my main(), so far:


public static void main(String[] args) 
{
    //The Configuration is meant only as an initialization-time object.
    Configuration cepConfig = new Configuration();
    cepConfig.addEventType( "HHBStatusEvent", HHBStatusEvent.class.getName() );

    // We setup the engine
    EPServiceProvider cep = EPServiceProviderManager.getProvider( "myCEPEngine", cepConfig );
    EPRuntime cepRT = cep.getEPRuntime();


    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone leaving
    String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

    EPAdministrator cepAdm = cep.getEPAdministrator();
    EPStatement departureStatement = cepAdm.createEPL( anEPLQuery );

    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone arriving
    anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) " +
            " -> eventTwo = HHBStatusEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) " +
            "-> eventThree = HHBStatusEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' ) ].win:time( 10 minutes )";
    EPStatement arrivalStatement = cepAdm.createEPL( anEPLQuery );



    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://192.168.1.11:1883" );
    mqttClient.setClientID( "CEP Engine 1" );

    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    departureEventListener.setTheMQTTClient( mqttClient );
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    arrivalEventListener.setTheMQTTClient( mqttClient );
    arrivalStatement.addListener( arrivalEventListener );


    // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (true)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   

Bear in mind, my Esper experience totals up, as of this writing, at 90 minutes. So take these comments as "directional" and an attempt to be helpful. Your mileage may vary.

The first four lines are straight from the examples. Create a Configuration object, then register the POJO event class and save off a reference to the CEP Runtime object. In order to insert MQTT events into the stream, we'll need to invoke a method off the CEP Runtime object. 

The EPL Queries to Detect the Event Patterns

The next three lines, from anEPLQuery to departureStatement, create the EPL query for an event trigger.

String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

Consult the Esper documentation but this is the syntax for triggering on three events in a specific sequence within a 5 minute time window.

Recall each Eaton Home Heartbeat sensor is assigned a unique (Zibgee) MAC address. Sensor '0x12467' is the back door, sensor '0x0B357' is the garage door.

We've talked about it earlier, but this statement says "watch for a door (12647) open event followed by a garage door open event followed by a garage door closed event -- all three occurring within a 5 minute window.

(The next lines work to create a similar EPLQuery for an arrival event, but with an extra sensor state tossed in and a 10 minute window.)

Setup the MQTT Client Specifics 
Slide down to:
    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://10.1.1.17:1883" );
    mqttClient.setClientID( "CEP Engine 1" );


And were setting up the Paho MQTT routines. My MQTT Broker is on  a server at 10.1.1.17, listening on the default port of 1883.  That brings us to:


    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!


My HHBStatusEventGenerator class generates HHBStatusEvents.  The code will be posted but, no surprise, this class subscribes to MQTT HHB/STATUS events and then calls Esper's cepRT.sendEvent() method.


Do This When the Event Pattern is Detected
And that's how Esper will start streaming in events.  But not quite yet. The lines:

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    <...>
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    <...>
    arrivalStatement.addListener( arrivalEventListener );


create and register Event Listeners with Esper.  A method in each listener class will be called with the event pattern is detected.  

Do Forever
At this point, the setup is complete and we can tell MQTT that we're ready to start subscribing to the event stream.

 // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (!hellIsFrozenOver)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   


That's about it.  At this point, conceptually:
  • When an MQTT HHB/STATUS event arrives, it's payload is extracted and a corresponding HHBStatusEvent object is instantiated
  • This HHBStatusEvent object is added to Esper's event stream
  • Esper watches the event stream for a pattern match against the 2 EPL statements (one arrival pattern, one departure pattern)
  • If a pattern match is detected, a method in the matching Listener object is invoked
  • At this point, it's up to us to decide what action to take on a pattern match



GOSUB 200 - The FTDI Device Driver 

I apologize for the interruption, but after spending a few hours on the 'net looking for a solution, I need to put down the information I've found in a spot where I can find it.  And that's right here.


If it's not broken...
With kernel 3.12, the FTDI device driver was changed to remove the ability to pass in a vendor and product ID.  The Eaton Home Heartbeat system has an FTDI USB-Serial chip embedded in it, with a product ID that's unknown to the current device driver.  

Earlier device driver releases let you overcome that by passing in the two new IDs:

root@pi# modprobe ftdi_sio vendor=0x0403 product=0xde29


That doesn't work anymore.  The correct way, as of this writing in July 2014, is to do the following. With device unplugged, go root, then:

root@pi# modprobe ftdi_sio
root@pi# echo 0403 DE29 > /sys/bus/usb-serial/drivers/ftdi_sio/new_id

These two command should execute without error and return immediately. If they do not, something else is amiss.

Try these manually at first to gain confidence that things are working.  Once you're sure, you can automate the driver configuration at boot time.



(Boot. Das Boot. Go SUB. Submarine, Das Boot. Get it? Huh???  Right???


I tell you, it all hangs together. I know at times it seems like a huge stretch, but I want you to know I take this whole 'blogging' stuff seriously!


If I didn't, I wouldn't kill hours searching for a movie poster about an obscure film with subtitles.)


Loading the FTDI Device Driver at (Das) Boot Time

You need two new files, a shell script and a udev rules file.  Create this shell script (owned by root:root) and plunk it in /etc (name it what you wish. Mine's named "hhb_ftdi.sh"

#!/bin/sh
/sbin/modprobe -r ftdi_sio
/sbin/modprobe ftdi_sio
echo 0403 de29 > /sys/bus/usb-serial/drivers/ftdi_sio/new_id




Now move into /etc/udev/rules.d and create a rules file. Mine's called "95-hhb_ftdi.rules"

# Rules for hotplugging Eaton Home Heartbest with FTDI (0x0403 0xDE29) Serial Adapter
SUBSYSTEM=="usb", ATTR{idVendor}=="0403", ATTR{idProduct}=="de29", RUN="/etc/hhb_ftdi.sh"
#
# These may be necessary to setup the correct permissions
KERNEL=="ttyUSB0", SYMLINK="hhb", GROUP="dialout", MODE="0660"
# KERNEL=="ttyUSB1", GROUP="dialout", MODE="0660"


The name of the rules file isn't too important. Just make sure it ends with ".rules".

Note the following - use "ATTR{idVendor}" not "SYSFS{idVendor}" as you'll find it out on the 'net.  ATTR has replaced SYSFS.  You'll get an error in /var/log/daemon.log stating "SYSFS{iDVendor} unknown key" or something to that effect.

Watch your "==" placement. Don't use the assignment "=" operator.  Check your spelling!

Note that this rule fires off the shell script.  Make sure the names match!  Note that I create a symbolic link, "/dev/hhb" to "/dev/ttyUSB0".  You don't have to do that if you don't want to.  In fact, start with the KERNEL line commented back out and see what you get.

Debugging udev rules is a known challenge.  Scan the kernel, syslog and daemon.log in /var/log for errors if things aren't working.

If all goes well, you're Eaton Home Heartbeat device will be ready and waiting for you on either port "/dev/ttyUSB0" or "/dev/hhb" when you reboot.