Friday, December 31, 2010

Remote Observer Pattern with Publish-Subcribe via XMPP [Apache Camel Series]



Apache Camel is a powerful message routing framework that can be used to implement all common enterprise integration patterns.

I'm going to look at Remote Observer Pattern, which is understandable complex to implement using traditional techniques.

However, with Apache Camel, it becomes surprisingly simple!

Reviewing the API

I'll just expand upon the previously blogged Asynchronous Observer/Listener with Apache Camel.

Here's the Listener interface, which is still the same:

public interface InvoiceListener {
public void invoiceCreated(int id, String name);
}

Still the same exact old Listener API, to prove that Apache Camel can be unobtrusive if you want. (yes, I want it to "stay out of my API way". Let Camel be the glue.)

And the same way to call the Observer :

invoiceListener.invoiceCreated(243, "Sumba Enterprise");

Publish-Subscribe Pattern via XMPP

What I want to do is route the listener invocation from the caller through a message queue channel/topic.

Interested observers can subscribe to that topic to receive (aka "consume" in Camel-speak) events/notifications/messages.

This is called Publish-Subscribe in Enterprise Integration Patterns.
The usual way to do this would be to use JMS and a message queue server/broker such as Apache ActiveMQ.

But it doesn't have to be. Besides if you use JMS directly you'll be tied to JMS API.

With Apache Camel you're free to change the underlying routing as you wish (XMPP, JMS, direct call, etc.).

Even if you want to change to another routing framework e.g. Spring Integration it's possible since both framework support the POJO bean proxying approach.

I have at least four reasons to use XMPP:
  1. It's very easy to debug. When there are problems you can just use Pidgin or Empathy or Spark or any XMPP/Jabber client to monitor what flows inside the pipes. No need for heavy tools. Even your grandma's mobile phone can do the job.
  2. No tooling needed. Same as above. JMS/ActiveMQ may have more/better tooling but you need the tooling. This can be either strength or weakness depending on how you look at it.
  3. It's just HTTP. I have never tried it but theoretically you can use any HTTP tool to tweak inside XMPP packets since in a way it's "XML over HTTP".
  4. No fancy server needed. You can install Openfire or ejabberd, just like you can install ActiveMQ. But you don't have to, you can use Jabber network or Google Talk or Google Apps' GTalk or even Facebook Chat's XMPP support. All free to use. (If you decide to use these services make sure to comply to their terms of use.)
Note: XMPP server/network's support of XMPP conference room chat functionality may vary.

Adding Camel XMPP and JSON Serialization

To use Camel with XMPP, let's revisit our project's Gradle build script and add camel-xmpp and camel-xstream as dependencies :

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'maven'
sourceCompatibility = '1.6'
repositories {
mavenRepo urls: "file:${System.properties['user.home']}/.m2/repository", name: 'local'
mavenCentral()
}
dependencies {
compile 'org.slf4j:slf4j-api:1.6.1'
runtime 'org.slf4j:jcl-over-slf4j:1.6.1'
runtime 'ch.qos.logback:logback-classic:0.9.26'
compile 'org.apache.camel:camel-core:2.5.0'
runtime 'org.apache.camel:camel-xmpp:2.5.0'
runtime 'org.apache.camel:camel-xstream:2.5.0'
}


While it's possible to use just Camel XMPP without Xstream, in practice it's much easier if we use a serialization library.

Camel provides several providers to serialize (i.e. marshall and unmarshall) our invocation/messages over the network transport (such as XMPP).
Camel calls these providers Data Formats.

I like to use JSON because it's somewhat human-readable (at least it's programmer-readable! :-) ) and relatively compact.

XML is also okay if you prefer it, both are supported by Xstream.

Camel also supports another JSON provider called Jackson but I wasn't able to successfully marshal BeanInvocation (Camel's underlying data class for proxied bean invocation) with it, so it's Xstream for now.

Routing to XMPP

 Let's review the last routing we had :

from("direct:invoice").inOnly().to("seda:invoice.queue");
from("seda:invoice.queue").threads().bean(loggerInvoiceListener);

This is fine. To make our invocation support XMPP all we need to do is change the routes, nothing else:

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").marshal().json().to(xmppUri);
from(xmppUri).unmarshal().json().bean(loggerInvoiceListener);

This is all that's needed. Apart from adding the dependencies that we've done above, no code needs to be changed!

I'm sure you can guess what the URI means, it tells Camel XMPP connector to create an XMPP endpoint with:
  • username: abispulsabot
  • domain: localhost
  • room: abispulsa.refill
  • password: test
I guess I didn't really have to write that, but just for the sake of assertiveness. ;-)

Note: if you want to use XML just change un/marshal().json() to un/marshal().xstream(). Simple isn't it? No fuss. ;-)

Making It Behave

The above routing is still not cool:
  1. It's single-threaded, i.e. both incoming and outgoing messages are blocking.
  2. It does not handle non-JSON messages.
So let's make it better.

Enabling Multi-threading aka Asynchronous Processing

To enable multi-threading we use the magic threads() DSL :

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().unmarshal().json().bean(loggerInvoiceListener);

That's it. Now each processing will use its own thread (up to a default maximum of 10 threads per thread pool, but you can configure, e.g. "threads(5)")

Logging Non-JSON Messages

So what happens when a non-JSON messages is posted to the XMPP room? Let's just log it:

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().choice()
.when(body().startsWith("{")).unmarshal().json().bean(loggerInvoiceListener)
.otherwise().to("log:xmpp");

We use conditional routing there, using choice() and  when() and otherwise().
And Predicates (constraint expressions) such as body().startsWith().
By the way, Camel supports Predicates with a lot more expression languages, so you won't run out of options.

You'll complain that the above routing still doesn't handle invalid JSON, but let's just be satisfied with that as my example.
I believe you can make better routes much faster than I do. ;-)

And So It Goes...

Again for the sake of completeness, I present you the entire app:

import org.apache.camel.CamelContext;
import org.apache.camel.builder.ProxyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
...
CamelContext camelContext = new DefaultCamelContext();
loggerInvoiceListener = new LoggerInvoiceListener();
final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().choice()
.when(body().startsWith("{")).unmarshal().json().bean(loggerInvoiceListener)
.otherwise().to("log:xmpp");
}
});
camelContext.start();
invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice").build(InvoiceListener.class);
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
invoiceListener.invoiceCreated(312, "Crux Market");
logger.info("third invoice sent");
try {
while (true) { // event loop so you can send messages
Thread.sleep(1000);
}
} finally {
camelContext.stop();
}

Here's an example log output:

00:57:03.280 [main] INFO  o.a.c.c.xmpp.XmppGroupChatProducer - Joined room: abispulsa.refill@conference.annafi as: abispulsabot
00:57:03.298 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
00:57:03.338 [main] INFO  o.a.c.component.xmpp.XmppConsumer - Joined room: abispulsa.refill@conference.annafi as: abispulsabot
00:57:03.338 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[xmpp://abispulsabot@localhost/?password=******&room=abispulsa.refill]
00:57:03.339 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
00:57:03.339 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 16.448 seconds
00:57:03.510 [main] INFO  id.co.bippo.camelxmppredo.App - first invoice sent
00:57:03.528 [main] INFO  id.co.bippo.camelxmppredo.App - second invoice sent
00:57:03.557 [main] INFO  id.co.bippo.camelxmppredo.App - third invoice sent
00:57:03.570 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
00:57:05.071 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
00:57:05.078 [Camel Thread 4 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:57:06.579 [Camel Thread 4 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:57:06.584 [Camel Thread 5 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #312 name: Crux Market created
00:57:08.085 [Camel Thread 5 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 312/Crux Market done!
00:57:26.399 [Camel Thread 6 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #7890 name: Bippo Indonesia created
00:57:27.900 [Camel Thread 6 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 7890/Bippo Indonesia done!
00:57:41.776 [Camel Thread 7 - Threads] INFO  xmpp - Exchange[ExchangePattern:InOnly, BodyType:String, Body:You can also filter messages! :)]

Notice that in the above log I posted an invoice #7890 "Bippo Indonesia" using Pidgin XMPP client, and the application can process it successfully.
I also said "You can also filter messages! :)" that simply gets logged.

As a cool bonus I provide a screenshot. ;-)

Extra: Gradle JavaExec

To quickly run the application from the command line, Gradle provides a JavaExec task that can be used like this in build.gradle :

task(exec, dependsOn: classes, type: JavaExec) {
description = 'Run the application'
classpath = runtimeClasspath
main = 'id.co.bippo.camelxmppredo.App'
}

Extra: logback.xml Configuration File

Logback defaults to logging everything to stdout (which is much better than Log4j's default of not logging anything!)

I prefer logging only INFO level, this can be done by putting the following logback.xml in src/main/resources :

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <layout class="ch.qos.logback.classic.PatternLayout">
      <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
    </layout>
  </appender>
  <root level="info">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

Utopian dream: I can't help but imagine that had Logback used Gradle-style configuration it would look like this: ;-)

appenders { stdConsole() }
root.level = info

Running the Example

To make it really easy for you (especially to myself, as I tend to forget things!), I've made the examples freely available as camel-xmpp-redo project on GitHub.

You'll need Git and Gradle to checkout then build it (and JDK 1.6).
Gradle will automatically download and cache the required dependencies for you.

git clone git://github.com/ceefour/camel-xmpp-redo.git
cd camel-xmpp-redo
# edit the source and change XMPP credentials
gradle exec
Note: you must change the XMPP credentials used to login.

Easy XMPP with Camel !

Apache Camel is unobtrusive way to add flexibility of routing, messaging, and integration patterns with XMPP or other connectors supported by Camel.
(you can create your own connectors if you want...)

I highly suggest the Camel in Action Book for the best in-depth guide and examples for using Camel to develop your applications more productively.

4 comments:

  1. Nice blog entry. I added a link to it from the Camel articles page
    https://cwiki.apache.org/confluence/display/CAMEL/Articles

    ReplyDelete
  2. @Claus Thank you! I feel honored.

    I hope you don't mind if I also request the other article to be there too : http://spring-java-ee.blogspot.com/2010/12/implementing-asynchronous-observer.html

    ;-)

    ReplyDelete
  3. Yes I have added that 2nd article as well. Keep 'em coming :)

    ReplyDelete