Using Terracotta To Cluster a Single JVM Master/Worker Application

October 5, 2007

I developed a batch processing system for my company that follows the Master/Worker design pattern. Of course I did this before things like Spring Batch or Open Data Grid were available. So now it works well enough (robust, task retry, generic workers, generic task partitioning, etc…) that it is hard for me to justify ripping it out and replacing it with one of the open source alternatives now available. However, I’d still like to take the system to the next level and distribute tasks among multiple JVMs (and machines) rather than just single JVM multi-threaded application. I’ve done test implementations in the past using Spring and JMS and that has worked just fine but this blog inspired me to give Terracotta a try.

The basic approach I took when developing our batch processing framework was to use the Java 1.5 concurrency package and especially the ExecutorService to simplify the concurrency and thread issues related to a parallel processing framework. The framework uses four basic components, a Task, a TaskPartitioner, TaskExecutor. and a TaskMaster.

public interface Task {
...
}

public interface TaskPartitioner {
   public Task getTasks(Task parentTask) throws PartitionException;
}

public interface TaskExecutor extends Runnable {
   /**
    * Execute a single task.  
    * This task should be a leaf-node that is 
    * currently in executable state. 
    * The task will be treated as a independent 
    * unit-of-work and any transactional operations will
    * be committed upon completion of task execution
    */
   public void executeTask(Task task) throws BatchException;
}

public interface TaskMaster {
   /**
    * This method takes a root task, parititons it into sub-tasks
    * and then executes those sub-tasks when they enter
    * an executable state.  
    */
   public void runTask(Task rootTask) throws BatchException;
}

The basic idea is a client submits a Task and a TaskPartitioner which is processed by the batch framework. The framework splits the Task into sub-tasks and may continue splitting those sub-tasks until it gets to a executable child. I’ll write about the TaskIterator in another post. As Tasks become executable that are submitted to the ExecutorService. Here is a very basic implementation of the TaskMaster interface that takes a root tasks and submits the executable sub-tasks to the ExecutorService when the are in an executable state.

public class SimpleTaskMaster implements TaskMaster {
   private final ExecutorService executorService = Executors.newFixedThreadPool(5);

   public void runTask(Task rootTask) throws BatchException {
       
       private final TaskIterator taskIterator = new TaskIterator(rootTask);
       while (taskIterator.hasNext()) {
           Task task = taskIterator.next();
           if (task == null) {
               // this means we have tasks
               // waiting to execute but they 
               // are blocked by other tasks
               // most likely serial dependencies
               try {
                  Thread.currentThread().sleep(100);
               } catch (InterruptedException ie) {}
               continue;              
           } 
           executorService.execute(new BasicTaskExecutor(te));
       }           
   }
}

Now that the basic framework is defined we can take it to the next level by replacing the BasicTaskMaster with a slightly modified version that will enable the introduction of Terracotta in the next step. By replacing the direct usage of the ExecutorService with a BlockingQueue we can distribute the queue and have or workers access the queue from the same or a different JVM using Terracotta’s clustering capabilities.

public class SimpleWorkQueue implements WorkQueue {
   private final BlcokingQueue workQueue;
   
   public Task getWork() throws InterruptedException {
     return workQueue.take(); // blocks if empty
   }

    public void addWork(Task executableTask) {
      workQueue.put(task);
    }
} 
public class BasicTaskMaster2 implements TaskMaster {
   private final WorkQueue workQueue;

   public BasicTaskMaster2(WorkQueue workQueue) { 
     this.workQueue = workQueue;
   }

   public void runTask(Task rootTask) throws BatchException {
       
       private final TaskIterator taskIterator = new TaskIterator(rootTask);
       while (taskIterator.hasNext()) {
           Task task = taskIterator.next();
           if (task == null) {
               // this means we have tasks
               // waiting to execute but they 
               // are blocked by other tasks
               // most likely serial dependencies
               try {
                  Thread.currentThread().sleep(100);
               } catch (InterruptedException ie) {}
               continue;              
           } 
           // instead of directly using the ExecutorService
           // we add tasks to workQueue which 
           // may or may not be distributed across
           // multiple JVMs.  Nice part is that it works
           // in a clustered or non-clustered environment
           workQueue.addWork(task);
       }           
   }
}

OK, finally… Time to use Terracotta to cluster this thing. Since I was already using Spring for DI it made perfect sense to use it to help cluster the application. To do so was surprisingly simple. First I had to create the terracotta configuration that distributes WorkQueue.

<tc:tc-config xmlns:tc="http://www.terracotta.org/config"
              xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xsi:schemaLocation="http://www.terracotta.org/schema/terracotta-4.xsd">
    <servers>
        <server host="%i" name="sample"/>
    </servers>
    <clients>
        <logs>%(user.home)/terracotta/client-logs/spring/coordination/%D</logs>
    </clients>
    <application>
        <spring>
            <jee-application name="*">
                <instrumented-classes>                   
                    <include>
                        <class-expression>
                           com.quantumretail.qlogic.batch.Task
                          </class-expression>
                    </include>
                </instrumented-classes>

                <locks>
                    <autolock>
                        <method-expression>* *..*.*(..)</method-expression>
                    </autolock>
                </locks>

                <application-contexts>
                    <application-context> 
                        <paths>
                            <path>*-context.xml</path>
                        </paths>
                        <beans>
                            <bean name="workQueue"/>
                        </beans>
                    </application-context>
                </application-contexts>
            </jee-application>
        </spring>
    </application>
</tc:tc-config>

The XML above basically tells Terracotta to distribute the Spring bean with name workQueue. I’ve oversimplified a bit because you will also need to tell Terracotta to instrument any objects that may be referenced by Task because those will be distributed as well. I’ll skip the Spring configuration here but it is pretty simple. Just define the workQueue bean and inject that in the BasicTaskMaster2 bean.

Now that the Master side of the Master/Worker is ready for clustering we have to define the Worker. The Worker is designed to be able to run in one or more JVMs and simply sits and waits for Tasks to become available. When they are it executes and the looks for the next task.

public class SimpleTaskExecutor implements TaskExecutor {
   private final WorkQueue workQueue;
   // the clustered workes can be multi-threaded as well
   private transient ExecutorService executorService = 
         Executors.newFixedThreadPool(5);
   public SimpleTaskExecutor(final WorkQueue workQueue) {
      // injected workQueue may be a local or 
      // clustered object, doesn't matter to this class
      this.workQueue=workQueue;
   }

   public void start() {
      while (true) {
         final Task task = workQueue.getWork();
         executor.service.execute(...)
      }
   }
}

At this point the clustered framework is ready to run. So after the terracotta server is running we start the Master JVM which will add work to the workQueue.

public class Master {
   public static void main(String[] args) {
      new ClassPathXmlApplicationContext(new String["master-context.xml"]);
   }
}

And then run one-to-n workers.

public class Worker {
   public static void main(String[] args) {
      ApplicationContext ctx = 
          new ClassPathXmlApplicationContext(new String["worker-context.xml"]);
      TaskExecutor taskExecutor = (Tas)context.getBean("simpleTaskExecutor");
      taskExecutor.start();
   }
}

Now we have a clustered Master/Worker application that is ‘Ready for Work’. Obviously I’ve skipped a ton of details and as described above the application would not be a robust, fault-tolerant batch processing framework but it should give you an idea of how easy it is to take a single JVM application and turn it into a clustered application with a little re-factoring and a little XML.