Wednesday, March 21, 2012

Threading stories: about robust thread pools

Another blog of my threading series. This time it's about thread pools, robust thread pool settings in particular. In Java thread pools are implemented by the ThreadPoolExecutor class introduced in Java 5. The Javadoc of that class is very well organized. So I spare me the effort to give a general introduction here. Basically, what ThreadPoolExecutor does is, it creates and manages threads that process Runnable tasks that were submitted to a work queue by an arbitrary client. It's a mechanism to perform work asynchronously, which is an important capability in times of multi-core machines and cloud computing.

To be useful in a wide range of contexts ThreadPoolExecutor provides some adjustable parameters. This is nice, but it also leaves the decision to us, the developers, to choose the right settings for our concrete cases. Here is the largest constructor for a ThreadPoolExecutor.


Thread pool types

Some of the parameters shown in the construtor above are very sensible ones in terms of resource consumption and resulting system stability. Based on the different parameter settings of the constructor it's possible to distinguish some fundamental categories of thread pools. Here are some default thread pool settings offered by the Executors class.


In the "cached thread pool" the number of threads is unbounded. This is caused by a maximumPoolSize of Integer.MAX_VALUE in conjunction with SynchronousQueue. If you submit tasks in a burst to that thread pool, it will likely create a thread for each single task. In this scenario created threads terminate when they were idle for 60 seconds. The second example shows a "fixed thread pool", where maximumPoolSize is set to a specific fixed value. The pools thread count will never exceed this value. If tasks arive in a burst and all threads are busy then they will queue up in the work queue (here a LinkedBlockingQueue). Threads in this fixed thread pool never die. The drawback of unbounded pools is obvious: both settings can cause JVM memory trouble (you get OutOfMemoryErrors - if you're lucky).

Let's look at some bounded thread pool settings:


The first snippet creates a chached thread pool with the number of threads bounded to a value of 50. If tasks arrive in a burst and all the threads are busy, a call to the ThreadPoolExecutor.execute() method would now be rejected by issueing a RejectedExecutionException. Often this isn't what I typically want, therefore I change the saturation policy by setting the rejected-execution-handler to CallerRunsPolicy. This policy pushes the work back to the caller. That is, the client thread that issued the task for asynchronous execution will now run the task synchronously. You can develop your own saturation policy by implementing your own RejectedExecutionHandler. The second snippet creates a fixed thread pool with 50 threads and a work queue that is bounded to a value of 100000 tasks. If the work queue is full, the saturation policy pushes the work back to the client. The cached pool creates threads on demand and it terminates the threads if they idle for 60 seconds. The fixed pool keeps the threads alive.

Thread pool boundaries

As shown above there are two basic approaches to define thread pools: bounded and unbounded thread pools. Unbounded thread pools, like the default ones of Executors class work fine, as long you don't submit too many tasks in a burst. If that happens unbounded thread pools can harm your systems stability. Either too many threads are created by a cached thread pool, or too many tasks are queued in a fixed thread pool. The letter is more difficult to achieve, but still possible. For production use it may be better to set the boundaries to some meaningfull values like in the last two thread pool settings. Because it can be tricky to define those "meaningfull boundaries" I have developed a little program that does that work for me.


The program will find ideal thread pool boundaries for the maximum capacity of your work queue and the required thread count. The algorithms are based on the work of Brian Goetz and Dr. Heinz Kabutz, you can find the references in the Javadoc. Calculating the capacity required by your work queue in a fixed thread pool is relatively simple. All you need is the desired target size of the work queue in bytes divided by the average size of your submitted tasks in bytes. Unfortunately, calculating the maximum thread count *isn't* an exact science. However, if you use the formulas in the program you avoid the harmful extremes of too large work queues and too many threads. Calculating the ideal pool size depends on the wait time to compute time ratio of your task. The more wait time, the more threads required to achieve a given utilization. The PoolSizeCalculator requires the desired target utilization and the desired maximum work queue memory consumption as input. Based on an investigation of object sizes and CPU time it returns the ideal settings for maximum thread count and work queue capacity in the thread pool.

Let's go through an example. The following snippet shows how to use the PoolSizeCalculator in a scenario of 1.0 (=100%) desired utilization and 100000 bytes maximum work queue size.


MyPoolSizeCalculator extends the abstract PoolSizeCalculator. You need to implement three template methods: getCurrentThreadCPUTime, creatTask, createWorkQueue. The snippet applies standard Java Management Extensions for the CPU time measurement (line 13). If JMX isn't accurate enough, then other frameworks can be considered (e.g. SIGAR API). Thread pools work best when tasks are homogeneous and independent. Therefore the createTask-method creates an instance of a single type of Runnable task (line 17). This task will be investigated to calculate wait time to CPU time ratio. Finally, I need to create an instance of the work queue to calculate memory usage of submitted task (line 21). The output of that program shows the ideal settings for the work queue capacity and the maximum pool size (number of threads). These are the results for my examplary I/O intense AsynchronousTask on a dual core machine.

Target queue memory usage (bytes): 100000
createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue
Formula: 100000 / 40
* Recommended queue capacity (bytes): 2500
Number of CPU: 2
Target utilization: 1.0
Elapsed time (nanos): 3000000000
Compute time (nanos): 906250000
Wait time (nanos): 2093750000
Formula: 2 * 1.0 * (1 + 2093750000 / 906250000)
* Optimal thread count: 6.0

The 'recommended queue capacity' and the 'optimal thread count' are the important values. An ideal setting for my AsynchronousTask would be as follows:


Using those settings your work queue cannot grow larger then the desired 100000 bytes. And since the desired utilization is 1.0 (100%), it does not make sense to make the pool larger then 6 threads (wait time to compute time ratio is three - for each compute time intervall, three wait time intervalls follow). The results of the program largely depend on the type of tasks you process. If the tasks are homogenous and compute intense the program will likely recommend to set the pool size to the number of available CPU. But if the task has wait time, like in I/O intense tasks, the program will recomend to increase the thread count to achieve 100% utilization. Also notice, some tasks change their wait time to compute time ratio after some time of processing, e.g. if the file size of an I/O operation increases. This fact suggests to develop a self-tuning thread pool (one of my follow up blogs). In any case, you should make the thread pool sizes configurable, so that you can adjust them at runtime.

OK, that's all in terms of robust thread pools for now. I hope you enjoyed it a little. And don't blame me if the formula isn't 100% accurate in terms of maximum pool size. As I said, it's not an exact science, it's about getting an idea of the ideal pool size.

13 comments:

  1. Good one. Surprisingly Thread pool is out there from long time but many programmer is still fell short of it. they not even use it so frequently,hope this article will help you to understand more.Another useful thread related feature is JDK 7 fork join framework which can help you to take advantage of multi-core processors

    ReplyDelete
  2. Cool! BTW, Little's Law is also related. So is the M/M/1 queuing model.

    ReplyDelete
  3. I didn't quite understand the fixed testtime of 3000 and its relation to wait time.

    ReplyDelete
  4. Testtime - CPU Time during Testtime = Wait time

    ReplyDelete
    Replies
    1. Hi, Can you kindly tell me how to set this EPSYLON value? why when I run my own task, it always throws out this "Test not accurate" exception, due to this value is not apropriate? And is there any relationships between the task execution time and the EPSYLON value?

      Delete
    2. just a so simple task,
      public class Task implements Runnable {

      public Task() {}

      public void run() {
      String str = "";
      try {

      for(int i = 0; i < 2; i++) {
      if (Thread.interrupted()) {

      break;
      }
      str = str + 't';

      Thread.sleep(100);
      }
      System.out.println(str);

      } catch (InterruptedException e) {

      return ;
      }
      }

      }

      Delete
    3. Try make the sleep 1 millisecond ...
      You need to have the average duration of your task below the EPSYLON -> average task time < EPSYLON. The purpose is to ensure that a minimum amount of executions is ensured. The test also fails if the system is under very high load (the the test is inaccurate).

      Delete
  5. Niklas, I did get, what is "warm-up" and why task should be started three times in a row?

    ReplyDelete
  6. Hi, I just want to make sure that I don't measure JIT compilation CPU usage. Therefore I warm up the same task before I enter the test interval. Cheers, Niklas

    ReplyDelete
  7. An impressive share! I have just forwarded this onto a coworker who has been doing a little research on this.
    And he in fact ordered me breakfast because I found it for him.
    .. lol. So let me reword this.... Thanks for the meal!
    ! But yeah, thanx for spending time to discuss this topic
    here on your website.
    My web site: http://www.bradmcallister.com/posts/we-all-appear-to-still-be-alive/

    ReplyDelete
  8. Hi, I want to use Thread Pool in this below program. How can I use?
    import java.util.*;

    public class MaxDemo2
    {
    public static void main(String[] args)
    {
    try{
    MaxSquare ms = new MaxSquare(4, 5);
    ms.dumpMatrix();
    ms.findSquares();
    System.out.println("total squares found : " + MaxSquare.cnts);
    }catch(Exception e){}
    }

    }

    class MaxSquare implements Runnable {

    Thread t[];
    static int cnts;

    class Square {
    int size;
    int r1;
    int r2;
    int c1;
    int c2;

    public Square(int x1, int x2, int y1, int y2) {
    r1 = x1; r2 = x2;
    c1 = y1; c2 = y2;
    size = (r2-r1+1)*(c2-c1+1);
    cnts = 0;
    }
    }



    protected int[][] a;
    int rows;
    int cols;
    int i,j;
    int S[][];
    public MaxSquare(int r, int c) {
    try{
    rows = r;
    cols = c;
    a = new int[r][c];
    initMatrix();

    }catch(Exception e){}
    }


    protected void initMatrix() {

    a[0][0] = 1;
    a[0][1] = 1;
    a[0][2] = 0;
    a[0][3] = 0;
    a[0][4] = 1;
    a[1][0] = 1;
    a[1][1] = 1;
    a[1][2] = 1;
    a[1][3] = 1;
    a[1][4] = 0;
    a[2][0] = 1;
    a[2][1] = 1;
    a[2][2] = 0;
    a[2][3] = 1;
    a[2][4] = 1;
    a[3][0] = 1;
    a[3][1] = 1;
    a[3][2] = 0;
    a[3][3] = 0;
    a[3][4] = 1;


    }

    public void dumpMatrix() {
    for (i=0; i<rows; i++) {
    for (j=0; j<cols; j++) {
    System.out.print(a[i][j]);
    }
    System.out.println();
    }
    }


    protected void findSquares()
    {
    try{
    t = new Thread[rows];

    S = new int[rows][cols];
    for(i=0;i<rows;i++)
    {
    for(j=0;j<cols;j++)
    S[i][j] = a[i][j];

    t[i] = new Thread(this,"demo");
    }

    cnts=0;

    for(i = 1; i < rows; i++)
    {
    t[i].run();
    }

    }catch(Exception e){}

    }

    public void run()
    {
    try{

    for(j = 1; j < cols; j++)
    {
    if(a[i][j] == 1)
    {

    S[i][j] = Math.min(Math.min(S[i][j-1], S[i-1][j]), S[i-1][j-1]) + 1;
    cnts++;
    }
    //System.out.println(j);
    }

    }catch(Exception e){}
    }

    }

    ReplyDelete