Wednesday, July 2, 2014

(#13) Esper Code - Esper's Main 

Make it work, make it work right, make it work fast -- in that order.  Here's the code that's running two simple rules.  As mentioned earlier, the Esper documentation lacks on step-by-step directions. So this post will try to remedy that.

I can add additional detail if requested, but I'll skip over the basics of downloading Esper, installing the JARs, and creating the libraries and dependencies for NetBeans (v7.4) and instead jump right into the code:


static void main (String args[])

Here's my main(), so far:


public static void main(String[] args) 
{
    //The Configuration is meant only as an initialization-time object.
    Configuration cepConfig = new Configuration();
    cepConfig.addEventType( "HHBStatusEvent", HHBStatusEvent.class.getName() );

    // We setup the engine
    EPServiceProvider cep = EPServiceProviderManager.getProvider( "myCEPEngine", cepConfig );
    EPRuntime cepRT = cep.getEPRuntime();


    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone leaving
    String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

    EPAdministrator cepAdm = cep.getEPAdministrator();
    EPStatement departureStatement = cepAdm.createEPL( anEPLQuery );

    // ----------------------------------------------------------------------------------------------------
    // Creste and regsiter the EPL for someone arriving
    anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent( macAddress = '000000B357', deviceStatus = 'OPEN' ) " +
            " -> eventTwo = HHBStatusEvent( macAddress = '0000012467', deviceStatus = 'OPEN' ) " +
            "-> eventThree = HHBStatusEvent( macAddress = '000007AAAF', deviceStatus = 'MOTION' ) ].win:time( 10 minutes )";
    EPStatement arrivalStatement = cepAdm.createEPL( anEPLQuery );



    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://192.168.1.11:1883" );
    mqttClient.setClientID( "CEP Engine 1" );

    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    departureEventListener.setTheMQTTClient( mqttClient );
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    arrivalEventListener.setTheMQTTClient( mqttClient );
    arrivalStatement.addListener( arrivalEventListener );


    // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (true)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   

Bear in mind, my Esper experience totals up, as of this writing, at 90 minutes. So take these comments as "directional" and an attempt to be helpful. Your mileage may vary.

The first four lines are straight from the examples. Create a Configuration object, then register the POJO event class and save off a reference to the CEP Runtime object. In order to insert MQTT events into the stream, we'll need to invoke a method off the CEP Runtime object. 

The EPL Queries to Detect the Event Patterns

The next three lines, from anEPLQuery to departureStatement, create the EPL query for an event trigger.

String  anEPLQuery = "SELECT * FROM PATTERN " +
            " [  every eventOne = HHBStatusEvent(macAddress='0000012467',deviceStatus='OPEN') " +
            " -> eventTwo   = HHBStatusEvent(macAddress='000000B357',deviceStatus='OPEN' ) " +
            " -> eventThree = HHBStatusEvent(macAddress='000000B357',deviceStatus='CLOSED' ) ].win:time( 5 minutes )";

Consult the Esper documentation but this is the syntax for triggering on three events in a specific sequence within a 5 minute time window.

Recall each Eaton Home Heartbeat sensor is assigned a unique (Zibgee) MAC address. Sensor '0x12467' is the back door, sensor '0x0B357' is the garage door.

We've talked about it earlier, but this statement says "watch for a door (12647) open event followed by a garage door open event followed by a garage door closed event -- all three occurring within a 5 minute window.

(The next lines work to create a similar EPLQuery for an arrival event, but with an extra sensor state tossed in and a 10 minute window.)

Setup the MQTT Client Specifics 
Slide down to:
    //
    // Get the MQTT Stuff ready
    MQTTClient  mqttClient = new MQTTClient();
    mqttClient.setBrokerURL( "tcp://10.1.1.17:1883" );
    mqttClient.setClientID( "CEP Engine 1" );


And were setting up the Paho MQTT routines. My MQTT Broker is on  a server at 10.1.1.17, listening on the default port of 1883.  That brings us to:


    //
    // We need to create the Callback Class next - so we can pass it into MQTT
    HHBStatusEventGenerator  statusEventSource = new HHBStatusEventGenerator();
    mqttClient.setCepRuntime( cepRT );               // Do this first!


My HHBStatusEventGenerator class generates HHBStatusEvents.  The code will be posted but, no surprise, this class subscribes to MQTT HHB/STATUS events and then calls Esper's cepRT.sendEvent() method.


Do This When the Event Pattern is Detected
And that's how Esper will start streaming in events.  But not quite yet. The lines:

    // 
    // Don't start subscribing yet!  A bit more setup...
    // Register the callback
    DepartureEventListener departureEventListener = new DepartureEventListener();
    <...>
    departureStatement.addListener( departureEventListener );

    ArrivalEventListener arrivalEventListener = new ArrivalEventListener();
    <...>
    arrivalStatement.addListener( arrivalEventListener );


create and register Event Listeners with Esper.  A method in each listener class will be called with the event pattern is detected.  

Do Forever
At this point, the setup is complete and we can tell MQTT that we're ready to start subscribing to the event stream.

 // 
    // Now -- Connect to the broker and start subscribing!
    mqttClient.connect( "tcp://10.1.1.17:1883",  "CEP Engine 1" );
    mqttClient.subscribe( "HHB/STATUS" );

    while (!hellIsFrozenOver)
        try {
            Thread.sleep( 1000L );
        } catch (InterruptedException ex) {
            Logger.getLogger(CEPMQTT_1.class.getName()).log(Level.SEVERE, null, ex);
        }

}   


That's about it.  At this point, conceptually:
  • When an MQTT HHB/STATUS event arrives, it's payload is extracted and a corresponding HHBStatusEvent object is instantiated
  • This HHBStatusEvent object is added to Esper's event stream
  • Esper watches the event stream for a pattern match against the 2 EPL statements (one arrival pattern, one departure pattern)
  • If a pattern match is detected, a method in the matching Listener object is invoked
  • At this point, it's up to us to decide what action to take on a pattern match



No comments :

Post a Comment