(#15) Esper Example Continued - the Event Listener
We've outlined the Esper CEP main code in post #13, and in post #14, we posted the Java code that backs up an event. Here we'll cover off on the listener code -- the code that gets called when your event rule triggers.
Recall that we're looking for Departure Events and Arrival Events. Each occur when three other events happen in a specific sequence in a specific amount of time.
Here's the code for the Departure Event Listener:
public class DepartureEventListener implements UpdateListener
{
// Esper's using Log4J v1.2.17 - we might as well too
private final org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getLogger( "DepartureEventListener" );
private MQTTClient theMQTTClient = null;
// -------------------------------------------------------------------------
@Override
public void update(EventBean[] newData, EventBean[] oldData)
{
logger.info( "Departure Event Listener - update");
//
// Note the names align to the EPL statement. We called them 'eventOne'
// 'eventTwo' and 'eventThree' in the EPL
HHBStatusEvent eventOne = (HHBStatusEvent) newData[ 0 ].get( "eventOne" );
HHBStatusEvent eventTwo = (HHBStatusEvent) newData[ 0 ].get( "eventTwo" );
HHBStatusEvent eventThree = (HHBStatusEvent) newData[ 0 ].get( "eventThree" );
//
// Calculate time -- in seconds -- between the three events
long deltaTime1 = (eventTwo.getDateTime().getTimeInMillis() -
eventOne.getDateTime().getTimeInMillis()) / 1000L;
long deltaTime2 = (eventThree.getDateTime().getTimeInMillis() -
eventTwo.getDateTime().getTimeInMillis()) / 1000L;
long deltaTime3 = (eventThree.getDateTime().getTimeInMillis() -
eventOne.getDateTime().getTimeInMillis()) / 1000L;
//
// Create the message payload...
// CEP/CONCLUSION | 2014-05-28 15:24:23 -0600 | DEPARTURE | EPL1 | E1:E2 10 | E2:E3 88 | E1:E3 98 |
StringBuffer sb = new StringBuffer();
sb.append( "CEP/CONCLUSION" ); sb.append( " | " );
sb.append( getCurrentDateTime() ); sb.append( " | " );
sb.append( "DEPARTURE" ); sb.append( " | " );
sb.append( "EPL1" ); sb.append( " | " );
sb.append( "E1:E2 " ); sb.append( deltaTime1 ); sb.append( " | " );
sb.append( "E2:E3 " ); sb.append( deltaTime2 ); sb.append( " | " );
sb.append( "E1:E3 " ); sb.append( deltaTime3 ); sb.append( " | " );
logger.info( "Departure Event Listener - ready to call publish!" );
this.theMQTTClient.publishMessage( "CEP/CONCLUSION", sb.toString() );
logger.info( "Departure Event Listener - publish called!" );
}
// -------------------------------------------------------------------------
public void setTheMQTTClient (MQTTClient theClient)
{
this.theMQTTClient = theClient;
}
//--------------------------------------------------------------------------
private static String getCurrentDateTime()
{
Date now = new Date();
SimpleDateFormat format = new SimpleDateFormat( "YYYY-MM-dd HH:mm:ss Z");
return format.format( now );
}
}
- The class implements the UpdateListener Interface
- There's one abstract method to implement, update()
- The update() method receives the events that matched the rule
- And you can do whatever you want to do when the rule triggers
Reacting to a Departure Event
In my case, I simply create another MQTT message and publish an event. The presumption is that there's another process, elsewhere on the network that will subscribe to these events and that the proper action.
It's very useful to have the events that matched and triggered the conclusion passed in.