Skip to content
jimfcarroll edited this page Apr 18, 2012 · 5 revisions

Overview

What is Dempsy?

In a nutshell, Dempsy is a framework that provides for the easy implementation Stream-based, Real-time, BigData applications.

Dempsy is the Nokia's "Distributed Elastic Message Processing System."

  • Dempsy is Distributed. That is to say a dempsy application can run on multiple JVMs on multiple physical machines.
  • Dempsy is Elastic. That is, it is relatively simple to scale an application to more (or fewer) nodes. This does not require code or configuration changes but allows the dynamic insertion and removal of processing nodes.
  • Dempsy is Message Processing. Dempsy fundamentally works by message passing. It moves messages between Message processors, which act on the messages to perform simple atomic operations such as enrichment, transformation, or other processing. Generally an application is intended to be broken down into more smaller simpler processors rather than fewer large complex processors.
  • Dempsy is a Framework. It is not an application container like a J2EE container, nor a simple library. Instead, like the Spring Framework it is a collection of patterns, the libraries to enable those patterns, and the interfaces one must implement to use those libraries to implement the patterns.

What Problem is Dempsy solving?

Dempsy is not designed to be a general purpose framework, but is intended to solve a certain class of problems while encouraging the use of the best software development practices.

Dempsy is meant to solve the problem of processing large amounts of "near real time" stream data with the lowest lag possible; problems where latency is more important that "guaranteed delivery." This class of problems includes use cases such as:

  • Real time monitoring of large distributed systems
  • Processing complete rich streams of social networking data
  • Real time analytics on log information generated from widely distributed systems
  • Statistical analytics on real-time vehicle traffic information on a global basis

It is meant to provide developers with a tool that allows them to solve these problems in a simple straightforward manner by allowing them to concentrate on the analytics themselves rather than the infrastructure. Dempsy heavily emphasizes "separation of concerns" through "dependency injection" and out of the box supports both Spring and Guice. It does all of this by supporting what can be (almost) described as a "distributed actors model."

In short Dempsy is a framework to enable decomposing a large class of message processing applications into flows of messages to relatively simple processing units implemented as POJOs

What is a Distributed Actor Framework?

Dempsy has been described as a distributed actor framework. While not strictly speaking an actor framework in the sense of Erlang or Akka actors, in that actors typically direct messages directly to other actors, the Message Processors in Dempsy are "actor like POJOs" similar to Processor Elements in S4 and less so like Bolts in Storm. Message processors are similar to actors in that Message processors act on a single message at a time, and need not deal with concurrency directly. Unlike actors, Message Processors also are relieved of the the need to know the destination(s) for their output messages, as this is handled inside the Dempsy Distributor.

The Actors model is an approach to concurrent programming that has the following features:

  • Fine-grained processing

A traditional (linear) programming model processes input sequentially, maintaining whatever state is needed to represent the entire input space. In an Actor model, input is divided into messages and distributed to a large number of independent actors. An individual actor maintains only the state needed to process the messages that it receives.

  • Shared-Nothing

Each actor maintains its own state, and does not expose that state to any other actor. This eliminates concurrency bottlenecks and the potential for deadlocks. Immutable state (eg, a road network artifact) may be shared between actors.

  • Message-Passing

Actors communicate by sending immutable messages to one-another. Each message has a key, and the framework is responsible for directing the message to the actor responsible for that key.

A distributed actors model takes an additional step, of allowing actors to exist on multiple nodes in a cluster, and supporting communication of messages between nodes. It adds the following complexities to the Actors model:

  • Distribution of Messages

A message may or may not be consumed by an actor residing in the same JVM as the actor that sent the message. The required network communication will add delay to processing, and require physical network configuration to support bandwidth requirements and minimize impact to other consumers.

  • Load-Balancing

The framework must distribute work evenly between nodes, potentially using different strategies for different message types (eg: regional grouping for map-matcher, simple round-robin for vehicles).

  • Node Failure

If a node fails, the workload on that node must be shifted to other nodes. All state maintained by actors on the failed node is presumed lost.

  • Network Partition

If the network connection to a node temporarily drops, it will appear as a node failure to other nodes in the cluster. The node must itself recognize that it is no longer part of the cluster, and its actors must stop sending messages (which may conflict with those sent by the cluster's "replacement" node).

  • Node Addition

To support elastic scalability (adding nodes on demand to service load, as well as re-integration of a previously failed node), the framework must support redistribution of actors and their state based on changes to the cluster.

Guiding philosophy

Above all, and in many ways, Dempsy is meant to be SIMPLE. It doesn't try to be the solution for every problem. It tries to do one thing well and it is meant to support developers that think this way. Dempsy is built emphasizing, and built to emphasize several interrelated principles. These principles are meant to reduce the longer term total cost of ownership of the software written using Dempsy. These include:

  • Separation of Concerns (SoC) - Dempsy expects the developer to be able to concentrate on writing the analytics and business logic with (virtually) no consideration for framework or infrastructure.

  • Decoupling - SoC provides the means to isolate cross-cutting concerns so that code written for the Dempsy little to no (with due respect to annotations) dependence on even the framework itself. Developer's code is easily separable from the framework and, in the spirit of Dependency Injection, the framework uses the developer's code rather than the developer begin required to use the framework. This type of decoupling provides for analytics/business code that has no infrastructure concerns: no framework dependencies, no messaging code, no threading code, etc.

  • Testability - All of this provides for code that's more testable in isolation from these concerns.

  • "Do one thing well" - Dempsy is written to provide one service: support for the type of "Distributed Actors Model" (with all of the acknowledged caveats) programming paradigm. For this reason it does not pretend to be an Application Server. Nor does it substitute for the lack of an automated provistioning/deployment system.

Getting Started

What does a Dempsy application look like - The standard WordCount example

In order to understand how to use Dempsy you need to understand how a Dempsy application is structured and the best way to see this is through a simple example. The "Hello World" of the BigData applications seems to be the "word counter." If you're familiar with Hadoop then you probably started with the WordCount example. In this simple example let's suppose we have a source of words from some text. In traditional batch based BigData systems the source for these words would be a file or perhaps already partitioned across some distributed storage. In a Dempsy application we're receiving these words in real-time through a data stream. Maybe we're getting all of the live data from a large social media application and we want to calculate the histogram of word occurrences.

Preliminary build setup

If you're doing more than simply reading this section you'll need to add the Dempsy dependencies to your build as we walk through the examples. This should be as simple as including the following dependency in your maven (or the gradle equivalent) {{{pom.xml}}} file.

<dependency>
   <groupId>com.nokia.dempsy</groupId>
   <artifactId>lib-dempsyapi</artifactId>
   <version>${dempsy.version}</version>
   <scope>provided</scope>
</dependency>

The current dempsy version is 0.5 so you will need to also include the pom.xml file property or use it directly in the dependency declaration. Scope is provided because Dempsy is packaged separately from your application.

The Message Processor Prototype

What would the Dempsy application that accomplishes the above described feat look like? Imagine that each word from this hypothetical live stream of text is broken into its own message. Each of these messages is routed to an instance of a class (let's call that instance a message processor) that has the responsibility to keep track of the count for a single word. That is, there is an instance of Word Counting message processor, per distinct word. For example, every time the word "Google" flows through the stream it's routed to the same message processor (the one dedicated to the word "Google"). Every time the work "is" is encountered, it's routed to another message processor instance. And likewise with the word "Skynet."

How easy would it be to write the code for that message processor class? This way of looking at the problem, that is in a functional programming manner, makes the code fairly simple. It could be as simple as this:

class WordCount
{
  private long count = 0;

  public void countWord(String word)
  {
     count++;
  }
}

Notice, we write the message processor in such a way that we assume each instance is responsible for a single word and that in the larger application there will be a many instances, each operating on their piece of the stream. These instances can be (usually are) spread out over a large number of machines.

Of course, what's missing? How does each word get to its respective message processor? How are the WordCount instances instantiated, deleted, provided their word message? Where are they instantiated? What about synchronization? What about sending out messages? All of these are the primary responsibility of Dempsy, as will be explained.

Message Processor Addressing

At this point we have a nice little piece of POJO functionality completely unmoored from infrastructural concerns. Let's look at how Dempsy handles some of these concerns. As mentioned, one of Dempsy's primary responsibilities is, given a message, find the message processor responsible for handling that message. Now that we have a POJO that accomplishes some business functionality we need to tell Dempsy how they are to be addressed, that is, how Dempsy is to find which message processor is responsible for which messages.

The way Dempsy does this is through the use of a message key. Each message that flows through Dempsy needs to have a message key. Dempsy is annotation driven, so classes that represent messages need to identify a means of obtaining the message's MessageKey through the use of an annotation. An important concept to grasp here is that a MessageKey is essentially the address of a MessageProcessor instance.

In our example, each word is a message.

import com.nokia.dempsy.annotations.MessageKey;

public class Word
{
   private String wordText;

   public Word(String wordText)
   {
      this.wordText = wordText;
   }

   @MessageKey
   public String getWordText()
   {
      return wordText;
   }
}

So when Dempsy receives a message of type Word, it retrieves the key using the annotated method getWordText(). That key will become the address of a Message Processor somewhere on the system. Dempsy will find the Message Processor instance (in this case an instance of the class WordCount) within a cluster of nodes responsible for running the WordCount message processing. In the case that the instance doesn't already exist, Dempsy will clone() a WordCount instance prototype.

If you're paying attention you might notice there's two gaps that need to be filled in the WordCount implementation. First, how is it that Dempsy understands that the WordCount handles the Word message, and second, how is a WordCount prototype cloned (notice the existing WordCount class cannot (yet) simply be cloned()).

This requires us to revisit the WordCount implementation. We need to do several things to satisfy Dempsy:

  1. We need to identify the WordCount class as a MessageProcessor which is done with a class annotation
  2. We need to identify the WordCount.countWord() call as the point where the MessageProcessor handles the message.
  3. WordCount.countWord() needs to be annotated with the @MessageHandler annotation.
  4. WordCount.countWord() needs to take the actual message type. In this case, Word
  5. We need to make sure WordCount is Cloneable.

This would be accomplished by the following:

import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageProcessor;

@MessageProcessor
class WordCount implements Cloneable
{
  private long count = 0;

  @MessageHandler
  public void countWord(Word word)
  {
     count++;
  }

  public Object clone() throws CloneNotSupportedException
  {
     return (WordCount)super.clone();
  }
}

The Dempsy framework now has enough information that it can understand:

  1. How to create instances of the WordCount message processor given an already existing instance it will use as a prototype.
  2. That instances of WordCount handle messages of type Word using the WordCount.countWord() method.
  3. That the key for a given Word message, which represents the "address" of a unique WordCount instance to which the Word message should be routed, is provided by a call on the message instance, Word.getWordText().
Note: Currently the default implementation for the Serializer is the standard Java Serializer. This will be changed prior to a 1.0 release but in the meantime it should be noted that the current serializer requires the message to implement Serializable.

a word about the message key

It is critical that the Object that Dempsy obtains as the message key (in the example that would be the result of the call to Word.getWordText()) has the appropriate identity semantics. In all cases that means there needs to be a non-default equals() and hashCode() method. The reason for this is partially very obvious: a "unique" message key corresponds to an instance of a message processor so it's important to get the understanding of "unique" correct. The default Object behavior is not adequate. Think of Dempsy as using the key as if it were a key in a HashMap that contained all of the current message processor instances. The default implementation of Object.equals and Object.hashCode wouldn't work given multiple instantiations of the same Word.

But this is not all. Given that instances of a message processor are distributed across many nodes, the default routing behavior of Dempsy uses the hashCode() as a means of determining which node a particular message processor is running on. Therefore, while strictly speaking most Java applications would work (though very poorly) if, for example, the hashCode() method were implemented to simply return 1, this would cause ALL message processors to be instantiate on the same node of a cluster.

In the example, the MessageKey is a java.lang.String which has appropriate identity semantics.  Note: The mesage key is not restricted to only String type but to any type that is hashable.

Adaptors

So where do Word messages come from and how do they get to Dempsy in order to be routed to the appropriate WordCount message processor? Dempsy provides an interface that needs to be implemented by the application developer in order to adapt sources of stream data to the Dempsy message bus. An Adaptor implementation:

  1. will be given a handle to the Dempsy message bus through an interface called a Dispatcher.
  2. will need to obtain data from an external source and use the Dispatcher to send that data onto Dempsy

The API for an Adaptor is very simple so we will extend the Word Count example with the following class:

import com.nokia.dempsy.Adaptor;
import com.nokia.dempsy.Dispatcher;

public class WordAdaptor implements Adaptor
{
   private Dispatcher dempsy;
   ...

   @Override
   public void setDispatcher(Dispatcher dispatcher)
   {
      this.dempsy = dispatcher;
   }

   @Override
   public void start()
   {
      running = true;
      while (running)
      {
         // obtain data from an external source
         String wordString = ... set this from an external source ...

         dempsy.dispatch(new Word(wordString));
      }
   }

   @Override
   public void stop()
   {
      running = false;
   }
}

The WordAdaptor will be instantiated by Dempsy. It will be provided a handle to a Dispatcher. Then Adaptor.start() will be called. The application developer is responsible for creating Dempsy compliant messages (as described above, a message should be Serializable (at least for now), and have a means of obtaining a MessageKey identified) using data from an external source.

Notice the lifecycle. The start() is called from the framework but it never exits. If it ever does exit, it will not be called again without restarting the node that the Adaptor was instantiated in. Note: It's very important that you manage this. You are allowed to exit the start() method whenever you want, either because the Adaptor is finished (if such a case exists) or because you decided to do the work in another thread (or many other threads) but Dempsy will not re-invoke the start() method.

Dempsy will invoke the stop() method to shut down the Adaptor when the node shuts down. Well behaved Adaptors must return from start() at this time, if they had not done so previously. Not doing so will hang the Vm on exit since, by default, the Adaptor is run in a non-daemon thread (though this is a configurable option for ill-behaved Adaptors).

Application Definition

Dempsy uses, and heavily emphasizes, a Dependency Injection based approach to application development. The configuration of an application can use any DI container the user wants to use. Out of the box Dempsy operates with Spring and Guice. The examples that follow will show how to do the configuration by hand which means the translation to any DI container should be obvious to the users of those containers. We will also include Spring examples.

At this point we should begin to have an understanding of what a Dempsy application is. It's a series of instances of message processors across a number of compute nodes, being routed messages based on their keys, and being supplied message data by Adaptors. The configuration of an application is simply a formalization of these specifics. To define (configure) the Word Count application we've been walking through, we need to simply lay out the specifics. Doing this programatically we would have:

import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;

 ...
      ApplicationDefinition myWordCountApplication =
        new ApplicationDefinition("word-count").add(
            new ClusterDefinition("adaptor", new WordAdaptor()),
            new ClusterDefinition("mp", new WordCount()));
 ...

Notice what we are doing here. We are defining the topology of a Dempsy application (and accepting all of the defaults). The application, called "word-count," consists of two clusters, "adaptor" and "mp," the first of which contains our Adaptor which, as we have seen, sources Word messages. This is followed by a message processor whose prototype is an instance of WordCount.

Although messages coming from the WordAdaptor flow to the WordCount message processor, the order in the definition doesn't actually matter. Dempsy determines where messages are sent based on the type of the message and the type of object that the MessageHandler on the MessageProcessor takes. In the case of our example, when the WordAdaptor adaptor produces a message of type Word, Dempsy knows that message can be handled by the WordCount message processor because the method WordCount.countWord() (which is annotated with the @MessageHandler annotation) takes the type Word. If there are other message processors that also have handlers that take a Word the messages will be routed to the appropriate message processor within those clusters also.

What do we do with the ApplicationDefinition. That depends on the Dependency Injection framework you're using. If using either Spring or Guice you don't need to do much else to run your application. If you're using a different dependency injection container then you'll need to obtain a reference to the Dempsy object and give it the ApplicationDefinition, but this is a more advanced topic for a later section. Moving forward we will show you how the Spring implementation works.

The above application definition could be defined using Spring as follows:

<beans>
  <bean class="com.nokia.dempsy.config.ApplicationDefinition">
    <constructor-arg value="word-count" />
    <property name="clusterDefinitions">
      <list>
        <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="adaptor"/>
          <property name="adaptor">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
          </property>
        </bean>
        <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="mp"/>
          <property name="messageProcessorPrototype">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
          </property>
        </bean>
      </list>
    </property>
  </bean>
</beans>

A Google Guice example isn't quite as straightforward since Guice is somewhat backward from more traditional DI frameworks. Spring's main unit of work is the 'bean' or instance. Guice centers on the 'dependency.' This makes Guice much more limiting than Spring for anything but trivial use cases or object graphs built from true singletons that assume a "service" type system rather than "component" model. This problem leads to what the Google Guice documentation calls 'the (supposed) "robot legs" problem' (note, you will need to scroll down in the FAQ to find it). Notice we want to define an ApplicationDefinition with multiple ClusterDefinitions each configured with their own user defined classes. This leads immediately to the "robot legs" problem described.

a word about the message processor lifecycle

The message processor prototype is instantiated on start-up. Therefore you have access to all of the DI framework (in this example Spring) capabilities for configuring your prototype.

Keep in mind how the life-cycle of a message processor works. There is a reason this is called a prototype. The instance that's instantiated on start-up by the dependency injection framework serves as a template for future instances. Dempsy will use the "clone" method to create more instances from this prototype. You need to consider that when you configure and initialize your message processor prototype and when you write the "clone" method.

Also note, while it shouldn't really matter, there will be a prototype created within each node that the specific message processor will be running in. However, it is possible that instances within one node were instantiated from cloning a prototype within another node and then that instance was migrated as part of Dempsy's elastic behavior.

Terminology

At this point in the example we should have an understanding of what a Dempsy application is. Now we need to clarify some terminology that's been developed and will continue to be used throughout this document as well as throughout the Dempsy code.

Term Definition
message processor an instance of a cloned message processor prototype responsible for processing every message of a particular type with a particular unique key.
message processor prototype an instance used by Dempsy to serve as a template when it needs to create more instances of a message processor.
message is an object that Dempsy routes to a message processor based on the message's key.
key obtained from a message using the method on the message object that's annotated with the `@MessageKey` annotation. Each unique key addresses an individual message processor instance in a cluster
cluster a cluster is the collection of all message processors or adaptors of a common type in the same stage of processing of a Dempsy application. A cluster contains a complete set of potential message processors keyed by all of the potential keys from a particular message type.That is, a cluster of message processor instances covers the entire key-space of a message.
node a node is a subset of a cluster containing a portion of the cluster's message processors. nodes are almost always (except in some possible test situations) the intersection of a cluster and a Java VM process. That is, the portion of a cluster that's running in a particular process is the cluster's node
container Sometimes also referred to as a message processor container, it is the part of the Dempsy infrastructure that manages the lifecycle of message processors within an individual node. That is, there is a one-to-one between a node and a container when limiting the view to a cluster. In single VM mode there is more than one cluster per VM but still one container per cluster.

Message Processor Scheduled Output

It is possible to schedule calls on the MessageProcessor instances so that they don't have to be completely message driven. We do this by marking which methods are to be called using the @Output annotation. Suppose, in our example, we wanted to periodically output the word counts. We would add the method:

...

  @Output
  public void outputResults(Word word)
  {
     System.out.println(myWord.getWordText() + ":" + count);
  }

  ...

The above example assumes we've saved off the Word message that was first passed to us in the variable myword. Now we need to actually schedule the output. This is done in the application configuration. For example, the above Spring based application definition would be extended as so:

...
       <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="mp"/>
          <property name="messageProcessorPrototype">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
          </property>
          <property name="outputSchedule">
             <bean class="com.nokia.dempsy.config.OutputSchedule">
               <constructor-arg name="interval" value="60"/>
               <constructor-arg name="timeUnit">
                  <bean class="java.util.concurrent.TimeUnit" factory-method="valueOf">
                      <constructor-arg value="SECONDS"/>
                  </bean>
               </constructor-arg>
             </bean>
          </property>
       </bean>
...

Notice the cluster definition for the MessageProcessor WordCount now contains an outputShedule that will cause every WordCount MessageProcessor instance to have the method WordCount.outputResults() method to be called. In the example it simply writes to the standard output the word and it's count to that point.

The Processing Pipeline

Up to now, in this example, the pipeline is two stages. But the first stage (the Adaptor) only provides the data. There's only one stage of processing. Dempsy is meant to be used in a pipeline of message processor clusters. Let's extend our example to another stage in the processing pipeline. What if we want to rank words we're counting based on usage? We will define a new message processor prototype that receives CountedWords and keeps the top 10.

Dempsy will forward any response to a MessageHandler or Output invocation. So first we must update the existing output method on the message processor as so:

  @Output
  public CountedWord outputResults()
  {
     return new CountedWord(myword,count);
  }

Now when output is invoked the CountedWord will be returned rather than simply printed out. Dempsy will forward that message on as long as it's a valid Dempsy message (again, a valid dempsy message is one that is Serializable and has a means of getting a valid message key using a method annotated with @MessageKey). For this example we could have:

public class CountedWord implements Serializable
{
   private static final long serialVersionUID = 1L;

   private long count;
   private String wordText;

   public CountedWord(Word message, long count)
   {
      wordText = message.getWordText();
      this.count = count;
   }

   public long getCount() { return count; }

   public String getWordText() { return wordText; }

   @MessageKey
   public Integer getKey() { return 1; }

   public String toString() { return wordText + ":" + count; }
}

A few things to notice. First, the MessageKey is always the same. The message design is part of the application and in this case the next stage of message processors will include only a single instance, by design. This will be the case since there is only a single message key ever created from a CountedWord message - an Integer with the value of '1'.

Also note, Dempsy error messages are meant to be extensive. It is the philosophy of the Dempsy creators that error messages should point exactly to the problem. Messages that don't indicate exactly what's wrong are useless for developers. In keeping with that spirit Dempsy will use the toString on objects supplied to it in error messages. For those messages to be as clear as possible you really should provide a non-default toString implementations for all messages, message processor classes, and adaptor implementations. The above example keeps with that rule-of-thumb.

Now, what will CountedWords be routed to? A WordRank message processor, of course. For example:

@MessageProcessor
public class WordRank implements Cloneable
{
   private Comparator<CountedWord> comparator = new Comparator<CountedWord>()
   {
      @Override
      public int compare(CountedWord o1, CountedWord o2)
      {
         long o1c = o1.getCount();
         long o2c = o2.getCount();
         return o1c < o2c ? -1 : (o1c > o2c ? 1 : 0);
      }
   };

   private TreeSet<CountedWord> topTen = new TreeSet<CountedWord>(comparator);

   @MessageHandler
   public void handleCount(CountedWord countedWord)
   {
      topTen.add(countedWord);

      if (topTen.size() > 100)
         trim();
   }

   @Output
   public void outputResults()
   {
      trim();

      for (Iterator<CountedWord> iter = topTen.descendingIterator(); iter.hasNext();)
      {
         CountedWord cur = iter.next();
         System.out.println(cur.getWordText() + ":" + cur.getCount());
      }
   }

   public Object clone() throws CloneNotSupportedException
   {
      return (WordRank)super.clone();
   }

   private void trim()
   {
      TreeSet<CountedWord> newTopTen = new TreeSet<CountedWord>(comparator);
      Iterator<CountedWord> iter = topTen.descendingIterator();
      for (int i=0; i < 10; i++)
         newTopTen.add(iter.next());
      topTen = newTopTen;
   }

}

Important things to note here:

  1. The WordRank message processor doesn't do any synchronization. There's no concern, for example, that while the topTen TreeMap is being manipulated in the outputResults() method it might change in the handleCount() method. Since Dempsy invokes both methods it will not invoke them concurrently.
  2. The handleCount() method is optimized to run as quickly as possible. If the message processor is busy when another message comes in then the message will be lost so message handling needs to be kept short. In this case we only trim the set periodically. Also note that outputResults is also kept short because @Output methods will cause message loss if they are busy when messages arrive for the message handler.
  3. The WordRank includes an output so we can periodically view the top ten.

Of course. Our application is now expanded so we need to expand out ApplicationDefinition by adding a new ClusterDefinition. Again, the order of the ClusterDefinitions within the ApplicationDefinition doesn't matter.

...
        <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="word-rank"/>
          <property name="messageProcessorPrototype">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordRank"/>
          </property>
          <property name="outputSchedule">
               <constructor-arg name="interval" value="30"/>
               <constructor-arg name="timeUnit">
                  <bean class="java.util.concurrent.TimeUnit" factory-method="valueOf">
                      <constructor-arg value="SECONDS"/>
                  </bean>
               </constructor-arg>
          </property>
...

Running the Dempsy application

By default are two modes that you can run the Dempsy application in. It can be run all within a local Java VM. Or it can be run distributed on a set of machines. For the purposes of this "Getting Started" guide we will demonstrate how to run it in a local Java VM. This would be easy to set up in an IDE like Eclipse.

There are several "main" implementations provided depending on what mode you're running a Dempsy application in, as well as which DI framework you're using. As a matter of fact the only place any particular DI container is assumed is in these supplied "main" applications so adding other currently unsupported DI containers is trivial.

If you're using Spring you will need the following build dependency:

<dependency>
   <groupId>com.nokia.dempsy</groupId>
   <artifactId>lib-dempsyspring</artifactId>
   <scope>provided</scope>
   <version>${dempsy.version}</version>
</dependency>

If you're using Guice you should include the following build dependency

<dependency>
   <groupId>com.nokia.dempsy</groupId>
   <artifactId>lib-dempsyguice</artifactId>
   <scope>provided</scope>
   <version>${dempsy.version}</version>
</dependency>

There are two "main" implementations (along with their DI framework specific configurations) for each container. One for each of the two default "modes" (local Java VM, or distributed). When running the the default "local Java VM" mode each and every cluster is instantiated within the same VM, each cluster only contains one node (see the section on "Terminology" if this statement isn't clear). In order to start the application in a local JVM you need to run:

java -Dapplication=WordCount.xml -cp [classpath] com.nokia.dempsy.spring.RunAppInVm

This main application is Spring based and will assume that your application context xml file is passed in with the "-Dapplication" system property and can be found on the classpath. In the example above that would be "WordCount.xml" which would need to be placed somewhere on the classpath.

To start the Dempsy application on a distributed system you need to start each node with the following command:

java -Dapplication=WordCount.xml -Dcluster=_ClusterToStart_ -Dzk_connect=_ZookeeperConnectUrlString_ [-Dzk_session_timeout=5000] [-Dtotal_slots_for_cluster=100] [-Dmin_nodes_for_cluster=3] -cp (classpath) com.nokia.dempsy.spring.RunNode

For our example we would need to:

  1. Make sure there was a zookeeper farm running and have the cluster url (see the ZooKeeper documentation if you have no idea what I'm talking about).
  2. Start the WordRank message processor cluster on one or more nodes (if less then 3 you need to supply the -Dmin_nodes_for_cluster parameter).
  3. Start the WordCount message processor cluster on one or more nodes (if less then 3 you need to supply the -Dmin_nodes_for_cluster parameter).
  4. Start the WordAdaptor cluster on one or more nodes (Adaptors don't have a minimum number of nodes setting).

Overview Reprise

Now that we've walked through a Dempsy application it's important to take a step back and review what Dempsy actually is, and what it's not. Dempsy framework for developing systems that process large amounts of real time data with the least possible lag. As such by default (this can be changed) Dempsy prefers to reduce lag OVER message delivery. As Milton Freedman titled his book, "there's no free lunch." Regardless of what Twitter says, you cannot have a "real-time" system and a "guaranteed-delivery" system at the same time, and at all times. Dempsy, by default at least, opt for the former.

Imagine a stream of data simply too large for the current system to process. One of two things must happen. Either lag is introduced, or messages are dropped. By default Dempsy opt to shed messages rather than produce late results. That's why the system is ideal for analytics; not so good for transactions. It's great for calculating averages, not so great for getting the exact count right.

However, since Dempsy is itself built on a set of abstractions, the framework itself can be modified and switching between the priorities of low-lag/real-time and guaranteed-delivery. The user should consider the type of application they are using and what they are trying to accomplish before supplying another transport mechanism.

Understanding The Dempsy codebase

Developers that use Dempsy are broken into two categories. "Application developers" want to use Dempsy to develop a stream based BigData application. Then there are those that want to extend or modify the framework itself. We'll call those developers "framework developers."

The codebase is broken into artifacts that reflect this understanding of the developer community. The Dempsy core codebase is broken into these three layers:

  • The Application Developer Api (lib-dempsyapi). The jar artifact lib-dempsyapi contains all of the annotations, interfaces, exceptions and configuration classes, required for the application developer.
  • The Dempsy Framework Api (lib-dempsycore). The jar artifact lib-dempsycore contains the the set of core abstractions that Dempsy itself is built on internally. It's meant as an api for those that want to extend the framework itself. For example, if someone doesn't like the current message transport implementations that comes out-of-the-box with Dempsy, they can implement their own and plug it in.
  • The Default Dempsy Implementation (lib-dempsyimpl). The jar artifact lib-dempsyimpl contains the default implementations for the framework Api (from lib-dempsycore) as well as a set of required concrete classes.

Dempsy also currently includes two runtime libraries that each support the startup and configuration using a particular Dependency Injection container.

  • Spring based startup and configuration (lib-dempsyspring). This library contains the "main" methods for starting Dempsy using Spring. The code here assumes that the ApplicationDefinition will be accessible to the Spring application context.
  • Guice based startup and configuration (lib-dempsyguice). This library contains the "main" methods for starting Dempsy using Google Guice. The code here assumes that the ApplicationDefinition will be accessible to the Guice application module.

The Application Developer Api

lib-dempsyapi contains all of the classes, interfaces, and annotations that an "application developer" would need. Some of those annotation we've seen already. A table with all of the currently available ones follows. Please review the javadocs for more information on each.

Annotation Description
@MessageProcessor This is a class annotation that identifies the class of a message processor prototype
@MessageHandler This is a method annotation that identifies a method on a message processor prototype class (which must be annotated with a @MessageProcess annotation) as one that handles a message. The method so annotated must take a single object class which must follow the Dempsy message requirements and can also return a message if desired.
@MessageKey As described above, a Dempsy message class requires that one, and only one, method be annotated as a message key.
@Activation, @Passivate When Dempsy migrates a message processor instance from one node to another, it will first invoke the @Passivate annotated method and then, on another node, after clone-ing the message processor prototype, it will provide the results from the @Passivate to the method annotated with @Activate. In this way Dempsy allows the application to transfer state information.
@Output Dempsy can make calls on message processors based on a particular schedule rather than simply based on incoming data. Methods annotated with the @Output call will be invoked at these scheduled times, and the data they return, if a compliant Dempsy message, will be passed on