Friday, December 31, 2010

Implementing Asynchronous Observer Pattern with Bean Proxy [Apache Camel Series]

Apache Camel is a very versatile message routing and enterprise integration framework.

One very useful, practical, and reasonable use case is to implement a Listener/Observer interface that delegates to an actual implementation with a flexible exchange pattern.

So from the point of view of the Observable (event source), the Observer is just a regular "POJO" Java object.

The Listener Interface


Here's the Listener interface :

public interface InvoiceListener {
public void invoiceCreated(int id, String name);
}

What I Want

What I'd like to do is to be able to call a listener object as usual :

InvoiceListener = .....
invoiceListener.invoiceCreated(243, "Sumba Enterprise");


But have the implementation flexible. It can be a local object, a remote web service, or even routed asynchronously using message queue (ActiveMQ) or XMPP.

A nice side effect is that the programming model stays the same. And of course, it's easily testable since your implementation is still POJO.

The Implementation

This is an example implementation of the listener, that simply logs the input.
Note there can be as many implementations as required.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggerInvoiceListener implements InvoiceListener {
Logger logger = LoggerFactory.getLogger(getClass());
public void invoiceCreated(int id, String name) {
logger.info("Invoice #{} name: {} created", id, name);
// Simulate asynchronous processing by delaying
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
logger.error("Interrupted", e);
}
logger.info("{}/{} done!", id, name);
}
}

Note that I've added Thread.sleep() there, to simulate processing.

The Usual Way aka Useful for JUnit Tests

So here's the standard stuff:

InvoiceListener = new LoggerInvoiceListener();
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");

While this works, and very useful in unit testing, in our example it is not practical.

The Listener blocks the thread for processing and the program runs for 3000 ms for two invoices.

Not to mention that the original thread cannot do anything else while waiting for the Listener to return.

Here's how we'll fix it using Apache Camel, without changing how we call the listener at all!

Setting Up Apache Camel

I will use Gradle to set up our project and automatically manage Apache Camel dependencies and SLF4j + Logback, which I use for logging.

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'maven'
sourceCompatibility = '1.6'
repositories {
mavenRepo urls: "file:${System.properties['user.home']}/.m2/repository", name: 'local'
mavenCentral()
}
dependencies {
compile 'org.slf4j:slf4j-api:1.6.1'
runtime 'org.slf4j:jcl-over-slf4j:1.6.1'
runtime 'ch.qos.logback:logback-classic:0.9.26'
compile 'org.apache.camel:camel-core:2.5.0'
}

Maven guys can do something similar, but I increasingly prefer Gradle due to its compact syntax and flexibility.

If you're not using dependency management or still use Ant, I strongly recommend Gradle for your project build system.

It's the flexibility of Ant, can optionally use conventions like Maven, with powerful dependency management of Ivy, with Groovy's clean syntax so won't intimidate you with verbose <angle brackets>.

To create the Eclipse IDE Project we'll create the source folder and execute Gradle's "eclipse" task.

mkdir -p src/main/java src/main/resources
gradle eclipse

Configuring CamelContext

Camel routes runs inside a CamelContext. Here's the usual way to create a default CamelContext, configure routes, start the CamelContext and stop it.

CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// TODO: configure routes here
});
camelContext.start();
// TODO: run your app here
camelContext.stop();

Configuring Camel routes can be done using a Java-based DSL (among other choices).

Here's a route DSL for our use case, that simply invokes a specified bean:

from("direct:invoice").bean(loggerInvoiceListener);

Camelizing Our App


With that in mind, let's see how our original application looks like:

private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
public static void main(String[] args) throws Exception {
invoiceListener = loggerInvoiceListener;
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
}

And this is how to do the same thing, using Apache Camel :

import org.apache.camel.CamelContext;
import org.apache.camel.builder.ProxyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:invoice").bean(loggerInvoiceListener);
}
});
camelContext.start();
invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice")
.build(InvoiceListener.class);
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
camelContext.stop();
}

You can see the Listener API usage has not changed at all.

Apart from the Camel runtime, the main difference lies in how we get the InvoiceListener object, which is now a proxy provided by Camel:

invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice")
  .build(InvoiceListener.class);

Here's the output of the Camel-ized app :

23:59:09.629 [main] INFO  o.a.c.i.c.AnnotationTypeConverterLoader - Found 3 packages with 13 @Converter classes to load
23:59:09.666 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 146 type converters in 0.405 seconds
23:59:09.774 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
23:59:09.775 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 is started.
23:59:09.775 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 0.546 seconds
23:59:09.788 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
23:59:11.289 [main] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
23:59:11.295 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
23:59:11.296 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
23:59:12.796 [main] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
23:59:12.797 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
23:59:12.797 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext:camel-1) is shutting down
23:59:12.798 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
23:59:12.806 [Camel Thread 0 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspension deferred.
23:59:12.807 [Camel Thread 0 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete.
23:59:12.807 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
23:59:12.809 [main] INFO  o.a.c.impl.DefaultInflightRepository - Shutting down with no inflight exchanges.
23:59:12.810 [main] INFO  o.a.camel.impl.DefaultCamelContext - Uptime: 3.581 seconds
23:59:12.810 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) is shutdown in 0.013 seconds

The app runs > 3.5 seconds, while about 400 ms is spent on Apache Camel startup. This is the minimal overhead of Camel.

I Want Them All at Once! Making It Asynchronous

A useful use case is to make the listener invocations asynchronous, i.e. performed with multiple threads. But not changing the caller code at all.
Let's see how we can do this with Camel.

All we need to do is change:

from("direct:invoice").bean(loggerInvoiceListener);

to:

from("direct:invoice").inOnly().to("seda:invoice.queue");
from("seda:invoice.queue").threads().bean(loggerInvoiceListener);

This route tells Camel to transform the original invocation to InOnly (meaning that no response/reply is needed), and send it to a SEDA Queue endpoint.

The messages in SEDA Queue will be split in multiple threads (default thread pool is 10 threads, but you can specify yourself) and each thread will invoke the destination implementation which is loggerInvoiceListener bean.

Here's how it looks right now :

00:11:44.952 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://invoice.queue]
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 0.529 seconds
00:11:44.969 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
00:11:44.970 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
00:11:44.970 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext:camel-1) is shutting down
00:11:44.971 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
00:11:44.971 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:11:44.971 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 suspension deferred.
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspension deferred.
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds.
00:11:45.975 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds.
00:11:46.471 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:11:46.471 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
00:11:46.976 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete.
00:11:46.976 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete.
00:11:46.976 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 2 seconds
00:11:46.977 [main] INFO  o.a.c.impl.DefaultInflightRepository - Shutting down with no inflight exchanges.
00:11:46.978 [main] INFO  o.a.camel.impl.DefaultCamelContext - Uptime: 2.553 seconds
00:11:46.979 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) is shutdown in 2.008 seconds

You can see from the log that Camel has executed our application in multiple parallel threads, as an effect, our application runs in about 2.5 seconds
instead of ~3.5 seconds like the previous.

You can also see that the order of execution is very different now.

Synchronous invocations:

23:59:09.788 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
23:59:11.289 [main] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
23:59:11.295 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
23:59:11.296 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
23:59:12.796 [main] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
23:59:12.797 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
Asynchronous invocations, with Camel:
00:11:44.969 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
00:11:44.970 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
...
00:11:44.971 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:11:44.971 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
...
00:11:46.471 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:11:46.471 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!

Instead of objects waiting for the previous invocation to complete, they can run independently.

All without changing the caller code nor the interface API nor the implementation code.

What you simply do is change the Camel routes.

Running the Example

To make it really easy for you (especially to myself, as I tend to forget things!), I've made the examples freely available as camel-async-redo project on GitHub.

You'll need Git and Gradle to checkout then build it (and JDK 1.6).
Gradle will automatically download and cache the required dependencies for you.

git clone git://github.com/ceefour/camel-async-redo.git
cd camel-async-redo
gradle exec

Conclusion

Apache Camel is unobtrusive way to add flexibility of routing, messaging, and integration patterns into your Java application.
(although, Camel can be useful to seam/weave/integrate any combination of applications, even remote services, so not just Java, but that's another topic...)

I highly suggest the Camel in Action book for the best in-depth guide and examples for using Camel to develop your applications more productively.