Friday, December 9, 2011

Java 7: Fork and join and the jam jar

Another Java 7 blog, this time it's about the new concurrency utilities. It's about plain fork and join in particular. Everything is explained with a straight code example. Compared to Project Coin concurrency is an inherently complex topic. The code example is a little more complex. Let's get started.

The Fork and Join Executor Service

Fork and join employs an efficient task scheduling algorithm that ensures optimized resource usage (memory and cpu) on multi core machines. That algorithm is known as "work stealing". Idle threads in a fork join pool attempt to find and execute subtasks created by other active threads. This is very efficient 'cause larger units get divided into smaller units of work that get distributed accross all active threads (and CPU's). Here is an analogy to explain the strength of fork and join algorithms: if you have a jam jar and you fill it with ping-pong balls, there is a lot air left in the glass. Think of the air as unused CPU resource. If you fill your jam jar with peas (or sand) there is less air in the glass. Fork and join is like filling the jam jar with peas. There is also more volume in your glass using peas, 'cause there is less air (less waste). Fork and join algorithms always ensure an optimal (smaller) number of active threads then work sharing algorithms. This is for the same "peas reason". Think of the jam jar being your thread pool and the peas are your tasks. With fork and join you can host more tasks (and complete volume) with the same amount of threads (in the same jam jar).

Image 1: Fork and join in the jam jar

Here is a plain fork and join code example:



Fork and join tasks always have a similar typical fork and join control flow. In my example I do want to calculate the prices for a list of car insurance offers. Let's go through the example.

Line 10: Fork and join tasks extend RecursiveTask or RecursiveAction. Tasks do return a result, actions doesn't. RecursiveTasks allow to specify the return type using generics. The result of my example is a List of Maps which contain the prices for the car insurance covers. One map of prices for each proposal.
Line 12: The task will calculate prices for proposals.
Line 22: Fork and join tasks implement the compute method. Again, the compute method returns a list of maps that contain prices. If there are four proposals in the input list, then there will be four maps of prices.
Line 24-26: Is the task stack (list of proposals) small enough to compute directly? If yes, then compute in this thread, which means call the pricing engine to calculate the proposal. If no, continue: split the work and call task recursively.
Line 31: Determine where to split the list.
Line 33: Create a new task for the first part of the split list.
Line 34: Fork that task: allow some other thread to perform that smaller subtask. That thread will call compute recursively on that subtask instance.
Line 35: Create a new task for the second part of the split list.
Line 36: Prepare the composed result list of the two devided subtask (you need to compose the results of the two subtwasks into a single result of the parent task)
Line 37: Compute the second subtask in this current thread and add the result to the result list.
Line 38: In the meantime the first subtask f1 was computed by some other thread. Join the result of the first subtask into the composed result list.
Line 39: Return the composed result.

You need to start the fork and join task.

Line 49
: Create the main fork and join task with the initial list of proposals.
Line 53: Create a fork and join thread pool.
Line 55: Submit the main task to the fork and join pool.

That's it. You can look into the complete code here. You'll need the PricingEngine.java and the Proposal.java.





3 comments:

  1. It looks very similar to ExecutorService/Future. Why should I care then? There must be a difference!?

    ReplyDelete
  2. Never mind my question. It's answered in the Javadoc. Thanks for noticing me. ;-)

    ReplyDelete
  3. Nice, Niklas. I would probably have a threshold larger than size 1 for which I would execute the work in serial. Splitting the work into too fine-grained work chunks will make F/J execute slower in my experience.

    In ForkJoinTask, this is stated as: "As a very rough rule of thumb, a task should perform more than 100 and less than 10000 basic computational steps ..."

    Also, instead of returning "Map" from the computeDirectly() method, you could also define a simple class

    public class ProposalResult extends HashMap {}

    And furthermore,

    public class ProposalResultList extends ArrayList {}

    The code would IMO read much nicer that way.

    ReplyDelete