Monday, August 4, 2014

(#15) Esper Example Continued - the Event Listener 


We've outlined the Esper CEP main code in post #13, and in post #14, we posted the Java code that backs up an event.  Here we'll cover off on the listener code -- the code that gets called when your event rule triggers.

Recall that we're looking for Departure Events and Arrival Events. Each occur when three other events happen in a specific sequence in a specific amount of time.

Here's the code for the Departure Event Listener:

public class DepartureEventListener implements UpdateListener 
{
    // Esper's using Log4J v1.2.17 - we might as well too
    private final org.apache.log4j.Logger logger = 
                 org.apache.log4j.Logger.getLogger( "DepartureEventListener" );

    private MQTTClient      theMQTTClient = null;
  
    
    // -------------------------------------------------------------------------
    @Override
    public void update(EventBean[] newData, EventBean[] oldData) 

    {
        logger.info( "Departure Event Listener - update");

        //
        // Note the names align to the EPL statement.  We called them 'eventOne'
        //  'eventTwo' and 'eventThree' in the EPL
        HHBStatusEvent  eventOne = (HHBStatusEvent) newData[ 0 ].get( "eventOne" );
        HHBStatusEvent  eventTwo = (HHBStatusEvent) newData[ 0 ].get( "eventTwo" );
        HHBStatusEvent  eventThree = (HHBStatusEvent) newData[ 0 ].get( "eventThree" );

        //
        // Calculate time -- in seconds -- between the three events
        long deltaTime1 = (eventTwo.getDateTime().getTimeInMillis() - 
                           eventOne.getDateTime().getTimeInMillis()) / 1000L;

        long deltaTime2 = (eventThree.getDateTime().getTimeInMillis() -
                           eventTwo.getDateTime().getTimeInMillis()) / 1000L;

        long deltaTime3 = (eventThree.getDateTime().getTimeInMillis() -
                           eventOne.getDateTime().getTimeInMillis()) / 1000L;

        //
        // Create the message payload...
        //  CEP/CONCLUSION | 2014-05-28 15:24:23 -0600 | DEPARTURE | EPL1 | E1:E2 10 | E2:E3 88 | E1:E3 98 |

        StringBuffer    sb = new StringBuffer();
        sb.append( "CEP/CONCLUSION" );        sb.append( " | " );
        sb.append( getCurrentDateTime() );    sb.append( " | " );
        sb.append( "DEPARTURE" );             sb.append( " | " );
        sb.append( "EPL1" );                  sb.append( " | " );
        sb.append( "E1:E2 " );        sb.append( deltaTime1 );            sb.append( " | " );
        sb.append( "E2:E3 " );        sb.append( deltaTime2 );            sb.append( " | " );
        sb.append( "E1:E3 " );        sb.append( deltaTime3 );            sb.append( " | " );
            
        logger.info( "Departure Event Listener - ready to call publish!" );       
        this.theMQTTClient.publishMessage( "CEP/CONCLUSION", sb.toString() );
        logger.info( "Departure Event Listener - publish called!" );       
    }

    // -------------------------------------------------------------------------
    public  void    setTheMQTTClient (MQTTClient theClient)
    {
        this.theMQTTClient = theClient;
    }
    
    //--------------------------------------------------------------------------
    private static  String  getCurrentDateTime()
    {
        Date now = new Date();
        SimpleDateFormat format = new SimpleDateFormat( "YYYY-MM-dd HH:mm:ss Z");
        
        return format.format( now );
    }
}

Things to note:
  1. The class implements the UpdateListener Interface
  2. There's one abstract method to implement, update()
  3. The update() method receives the events that matched the rule
  4. And you can do whatever you want to do when the rule triggers


Reacting to a Departure Event
In my case, I simply create another MQTT message and publish an event. The presumption is that there's another process, elsewhere on the network that will subscribe to these events and that the proper action.

It's very useful to have the events that matched and triggered the conclusion passed in.