Monday, January 4, 2010

Advanced Event Notification Framework with Apache Camel


Let's build an Event Notification Framework using Apache Camel... Second Edition.

I've demonstrated how to do event notification with Apache Camel before. But the classes were hard-coupled to Camel URI endpoints.

This time, I have these goals in mind:
  • Decoupled. The event source and listener(s) must be as decoupled as possible, but in a good way that it's not too complex.
  • As POJO as possible. Event sources, event classes, and event listeners need not import the event framework classes/interfaces. Annotations may be used.
See also: Event Notification with Spring Integration

    The Big Picture

    I will illustrate this technique using a sensor and display simulation application. The sensor (supposedly) detects a text, and when the text is changed, the sensor will notify its listeners. The display(s), as listeners, responds to such events and updates its own display.

    Sensors and Displays do not need to know each other, beyond mutually agreed Listener interfaces.

    I've implented a runnable proof-of-concept application and as such this technique can practically be used.

    You can download/explore the example from: http://github.com/ceefour/eventfx.camel

    Listener Interfaces

    The Listener interfaces provide event sources and event. I'm using generic listener interfaces to avoid creating too many interfaces just for different events. It's also possible to use generic Event payload classes/interfaces if we want to go that far.

    Let's define a generic listener interface:
    package com.soluvas.samples.eventfx.camel;
    
    import java.util.EventListener;
    
    import org.apache.camel.InOnly;
    
    public interface AsyncListener<E> extends EventListener {
    
     @InOnly
     void notify(E event);
    }
    This defines our asynchronous listener. That is, a fire-and-forget type of event. The event source does not need a response in this case.

    Let's define another generic interface for synchronous events, that provides a response:
    package com.soluvas.samples.eventfx.camel;
    
    import java.util.EventListener;
    
    import org.apache.camel.InOut;
    
    public interface SyncListener<E, R> extends EventListener {
    
     @InOut
     R update(E event);
    }
    
    I'm going to use a simple Event payload class, which contains a single String text property. If there are many events like this, we can potentially use a generic event class.
    package com.soluvas.samples.eventfx.camel;
    
    public class SensorEvent {
    
     private String text;
    
     public SensorEvent(String text) {
      super();
      this.text = text;
     }
    
     public String getText() {
      return text;
     }
    
     public void setText(String text) {
      this.text = text;
     }
     
    }
    

    The Event Source: Sensor

    Let's create the code for Sensor. Input for this sensor is manually set for the simulation.

    The sensor can be hooked to a SyncListener (synchronous SensorEvent listener that returns String) and/or AsyncListener (asynchronous SensorEvent listener).
    package com.soluvas.samples.eventfx.camel;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class Sensor {
    
     private static Log log = LogFactory.getLog(Sensor.class);
    
     private SyncListener<SensorEvent, String> sensorSyncListener;
     private AsyncListener<SensorEvent> sensorAsyncListener;
    
     private void fireSensor(SensorEvent event) {
      if (sensorSyncListener != null) {
       String result = sensorSyncListener.update(event);
       log.info("Response: " + result);
      }
      if (sensorAsyncListener != null) {
       sensorAsyncListener.notify(event);
      }
     }
    
     public void updateText(String text) {
      log.info("updateText: " + text);
      fireSensor(new SensorEvent(text));
     }
    
     public SyncListener<SensorEvent, String> getSensorSyncListener() {
      return sensorSyncListener;
     }
    
     public void setSensorSyncListener(SyncListener<SensorEvent, String> sensorSyncListener) {
      this.sensorSyncListener = sensorSyncListener;
     }
    
     public AsyncListener<SensorEvent> getSensorAsyncListener() {
      return sensorAsyncListener;
     }
    
     public void setSensorAsyncListener(
       AsyncListener<SensorEvent> sensorAsyncListener) {
      this.sensorAsyncListener = sensorAsyncListener;
     }
    
    }
    

    The Event Listener: Display

    The Display implements both AsyncListener<SensorEvent> and SyncListener<SensorEvent, String> interfaces, so it can be used for both purposes.
    package com.soluvas.samples.eventfx.camel;
    
    import javax.annotation.PostConstruct;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class Display implements SyncListener<SensorEvent, String>, AsyncListener<SensorEvent> {
     
     private static Log log = LogFactory.getLog(Display.class);
     private String name;
     
     @PostConstruct
     public void initialize() {
      log.info("Display " + getName() + " created.");
     }
    
     @Override
     public String update(SensorEvent event) {
      log.info("[" + name + "] is updated: '" + event.getText() + "'");
      return getName() + " received " + event.getText();
     }
    
     @Override
     public void notify(SensorEvent event) {
      log.info("[" + name + "] is notified: '" + event.getText() + "'");
      
     }
    
     public String getName() {
      return name;
     }
    
     public void setName(String name) {
      this.name = name;
     }
    
    }
    
    Implementation of the display is simply to log the SensorEvent's text with INFO level.

    Simulating Sensor Changes

    Since the Sensor must be manually updated, I created a simulator class that periodically updates the sensor with current time.
    package com.soluvas.samples.eventfx.camel;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class SensorSimulator {
     
     private static Log log = LogFactory.getLog(SensorSimulator.class);
     
     @Autowired
     private Sensor sensor;
     private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    
     @PostConstruct
     public void initialize() {
      scheduledExecutor.scheduleAtFixedRate(new Runnable() {
       
       public void run() {
        sensor.updateText("Something happens at " + SimpleDateFormat.getTimeInstance().format(new Date()));
       }
      }, 2000, 2000, TimeUnit.MILLISECONDS);
      log.info("Sensor simulator initialized.");
     }
     
     @PreDestroy
     public void destroy() throws InterruptedException {
      log.info("Destroying Sensor simulator...");
      scheduledExecutor.shutdown();
      scheduledExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
     }
    }
    
    The Simulator is an autonomous Spring component bean. It will get autowired to a Sensor bean (object) and do its job automatically until it is destroyed (in our case, until the application is terminated.)

    Wiring The Application

    To wire the beans that make up the application, I use Spring Framework 2.5.6 and Apache Camel 2.1.0 together.

    I put the following inside META-INF/spring/camel-context.xml :
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xmlns:context="http://www.springframework.org/schema/context"
     xmlns:camel="http://camel.apache.org/schema/spring"
     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    
     <context:component-scan base-package="com.soluvas.samples.eventfx.camel" />
    
     <bean class="com.soluvas.samples.eventfx.camel.Sensor">
      <property name="sensorSyncListener">
       <bean class="org.apache.camel.spring.remoting.CamelProxyFactoryBean">
         <property name="serviceUrl" value="seda:Sensor.SensorEventSync"/>
         <property name="serviceInterface" value="com.soluvas.samples.eventfx.camel.SyncListener"/>
       </bean>
      </property>
      <property name="sensorAsyncListener">
       <bean class="org.apache.camel.spring.remoting.CamelProxyFactoryBean">
         <property name="serviceUrl" value="seda:Sensor.SensorEventAsync"/>
         <property name="serviceInterface" value="com.soluvas.samples.eventfx.camel.AsyncListener"/>
       </bean>
      </property>
     </bean>
     <bean id="display1" class="com.soluvas.samples.eventfx.camel.Display">
      <property name="name" value="Sony(sync)" />
     </bean>
     <bean id="display2" class="com.soluvas.samples.eventfx.camel.Display">
      <property name="name" value="Samsung(async)" />
     </bean>
     <camelContext xmlns="http://camel.apache.org/schema/spring">
      <packageScan>
       <package>com.soluvas.samples.eventfx.camel</package>
      </packageScan>
     </camelContext>
    
    </beans>
    
    The above Spring configuration file sets up:
    • autodetection of stereotype-annotated (@Component) components
    • a Sensor bean that gets wired to Camel proxy listeners.
      The sync events go to seda:Sensor.SensorEventSync
      The async events go to seda:Sensor.SensorEventAsync
    • two Display beans, named Sony and Samsung (I don't endorse any of them)
    • Camel context bean, which will scan com.soluvas.samples.eventfx.camel package for route builders
    The final piece of work is the application itself, which contains:
    1. Spring context bootstrapping
    2. RouteBuilder configuration
    I choose to mix those two functionality into one, however they are not related and may be split.

    The code for MainRouteBuilder.java is as follows:
    package com.soluvas.samples.eventfx.camel;
    
    import org.apache.camel.builder.RouteBuilder;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class MainRouteBuilder extends RouteBuilder {
    
        /**
         * A main() so we can easily run these routing rules in our IDE
         */
        public static void main(String... args) {
         new ClassPathXmlApplicationContext("META-INF/spring/*.xml");
        }
    
        /**
         * Lets configure the Camel routing rules using Java code...
         */
        public void configure() {
         from("seda:Sensor.SensorEventSync").beanRef("display1");
         from("seda:Sensor.SensorEventAsync").delay(1200).beanRef("display2");
        }
    }
    
    The RouteBuilder implementation (configure()) above configures:
    • routes seda:SensorEventSync events to display1 (Sony).
    • delays seda:SensorEventAsync events by 1,200 milliseconds (to simulate asynchronous behavior) and routes them to display2 (Samsung).

    Camel-based Event Notification Framework in Action

    It's hard to visualize this application in writing (you have to see it running to notice the event timings). But this is the application output:
    [                          main] CamelNamespaceHandler          INFO  camel-osgi.jar not detected in classpath
    [                          main] SensorSimulator                INFO  Sensor simulator initialized.
    [                          main] Display                        INFO  Display Sony(sync) created.
    [                          main] Display                        INFO  Display Samsung(async) created.
    [                          main] DefaultCamelContext            INFO  Apache Camel 2.1.0 (CamelContext:camelContext) is starting
    [                          main] DefaultCamelContext            INFO  JMX enabled. Using DefaultManagedLifecycleStrategy.
    [                          main] DefaultCamelContext            INFO  Apache Camel 2.1.0 (CamelContext:camelContext) started
    [               pool-1-thread-1] Sensor                         INFO  updateText: Something happens at 6:17:04 PM
    [ seda://Sensor.SensorEventSync] Display                        INFO  [Sony(sync)] is updated: 'Something happens at 6:17:04 PM'
    [               pool-1-thread-1] Sensor                         INFO  Response: Sony(sync) received Something happens at 6:17:04 PM
    [seda://Sensor.SensorEventAsync] Display                        INFO  [Samsung(async)] is notified: 'Something happens at 6:17:04 PM'
    [               pool-1-thread-1] Sensor                         INFO  updateText: Something happens at 6:17:06 PM
    [ seda://Sensor.SensorEventSync] Display                        INFO  [Sony(sync)] is updated: 'Something happens at 6:17:06 PM'
    [               pool-1-thread-1] Sensor                         INFO  Response: Sony(sync) received Something happens at 6:17:06 PM
    
    We have successfully wired the Simulator, Sensor, and Displays, through Apache Camel.

    The Sensor does not know about the Displays. The Displays do not know whether they're being wired with Sensor, only that they can act as SensorEvent listeners (both async and sync).

    Neither Sensor nor Display are aware that they're being connected via Apache Camel.

    During firing of asynchronous event, Sensor expects the method call to return immediately. So even that we have delayed the async event for 1,200 milliseconds, Sensor could have proceeded with other processing during that time.

    Demonstrating Camel's Flexibility: Multicasting

    I know you're smart enough to realize that Camel was only used as infrastructure and are curious, "but it can be done without Camel!" and you'll be right... with some gotchas.

    You can leave out Camel and using Spring, wire directly the event source with its listeners. With synchronous events, it's doable and may even be exactly how you want to do it ("Keep It Simple, Stupid" principle).

    With asynchronous events, on the other hand, it's going to take you more work. Of course, you can code listeners to be asynchronously, but I think it's better to leave that job to Camel's configuration (routes).

    For example, if I want to do configure the sensor so all events are asynchronous and "multicasted" to both display1 and display2, I simply change the Camel route configuration to:
    public void configure() {
         ProcessorDefinition def = from("seda:Sensor.SensorEventAsync").multicast();
         def.beanRef("display1");
         def.delay(700).beanRef("display2");
        }
    
    And that's it. No need to change the implementation of Display (to pass the call to next in chain) nor the Sensor to handle multiple asynchronous listeners (traditionally, you'd need to use a List of listeners).

    Conclusion: Implementing Event Notification with Apache Camel

    Using Apache Camel proxy beans instead of plain Java (or plain Spring) method calls for event notification allows you to (but not limited to the following):
    • Still keep your POJO interfaces and method calls.
    • Switch between asynchronous and synchronous calls as you will.
    • Route the events to practically anywhere you want.
    • Multicast events and other routing techniques.
    • Transport events through network, via HTTP/email/XMPP/JMS or other protocols supported by Camel.
    • Decouple not only between event sources and event listeners, but also event routing.

    Learning More on Enterprise Integration


    Apache Camel is all about message routing, which is part of enterprise integration. If you want to learn more about enterprise integration, I heartily recommend the book Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions.

    1 comment:

    1. Great walkthrough Hendy, thanks. I'm just getting started with Camel, and the flexibility to do things like event-based processing is what attracted me to the framework.

      ReplyDelete