improve ThreadPool
[org.ibex.util.git] / src / org / ibex / util / ThreadPool.java
index 30e490f..16eeb83 100644 (file)
@@ -6,64 +6,88 @@ package org.ibex.util;
 
 import java.io.*;
 
-/** A simple thread pool. */
+/**
+ *   A thread pool.
+ * 
+ *   - Attempting to add a task will block until the task is assigned to
+ *     a thread.
+ *   - If a task has not been assigned after MINIMUM_DISPATCH_DELAY, a new
+ *     thread (above and beyond the ThreadPool size) will be created to
+ *     handle it.
+ *   - Whenever at least minIdleThreads are idle and more than minThreads
+ *     threads exist overall, any threads which complete their task will
+ *     be allowed to die.
+ *
+ */
 public final class ThreadPool {
 
-    private final Queue tasks;
-    private final Queue idle;
-    private final int   minThreads;
-    private final int   maxThreads;
-    private       int   numThreads;
+    private static final int MINIMUM_DISPATCH_DELAY = 1000;
+    private static final int minIdleThreads         = 3;
 
-    public ThreadPool(int min, int max) {
-        this.minThreads  = min;
-        this.maxThreads  = max;
-        this.idle    = new Queue(Math.min(min, 100));
-        this.tasks   = new Queue(Math.min(max, 100));
-    }
+    private final int            minThreads;
+    private       int            numThreads;
+    private final PooledThread[] idleThreads;
+    private       int            numIdleThreads;
 
-    public synchronized void appendTask(Runnable r) {
-        if (idle.size() > 0)              ((PooledThread)idle.remove()).setTask(r);
-        else if (numThreads < maxThreads) new PooledThread(r);
-        else                              tasks.append(r);
+    public ThreadPool(int minThreads) {
+        this.minThreads     = minThreads;
+        this.numThreads     = 0;
+        this.numIdleThreads = 0;
+        this.idleThreads    = new PooledThread[minThreads];
+        for(int i=0; i<this.idleThreads.length; i++)
+            this.idleThreads[i] = new PooledThread();
     }
 
-    public synchronized void prependTask(Runnable r) {
-        if (idle.size() > 0)              ((PooledThread)idle.remove()).setTask(r);
-        else if (numThreads < maxThreads) new PooledThread(r);
-        else                              tasks.prepend(r);
+    public synchronized void start(Runnable r) {
+        /* if no idle threads, wait for MINIMUM_DISPATCH_DELAY or until notified */
+        if (numIdleThreads == 0) try {
+            this.wait(MINIMUM_DISPATCH_DELAY);
+        } catch (Exception e) { Log.error(this, e); }
+
+        /* if there are idle threads, use one */
+        if (numIdleThreads > 0) {
+            numIdleThreads--;
+            idleThreads[numIdleThreads].start(r);
+            idleThreads[numIdleThreads] = null;
+            return;
+        }
+
+        /* otherwise, create a new thread */
+        new PooledThread().start(r);
     }
 
     private class PooledThread extends Thread {
-        private Runnable task = null;
-        private boolean die = false;
-        public PooledThread(Runnable r) {
-            numThreads++;
-            if (r!=null) setTask(r);
+        private Runnable runnable = null;
+        public PooledThread() {
+            synchronized(ThreadPool.this) {
+                if (numIdleThreads >= idleThreads.length)
+                    throw new Error("this should not happen");
+                numThreads++;
+                idleThreads[numIdleThreads++] = this;
+            }
             start();
         }
-        public synchronized boolean isRunning() { return task != null; }
-        public synchronized boolean setTask(Runnable r) {
-            if (task != null) throw new Error("setTask() on a thread that is alread busy!");
-            task = r;
-            notifyAll();
-            return true;
-        }
-        private synchronized void recycle() {
-            task = null;
-            // depends on the fact that our Queue class is synchronized
-            if (tasks.size() > 0)                       task = (Runnable)tasks.remove();
-            if (task==null && idle.size() < minThreads) idle.append(this);
-            else if (task==null)                        die = true;
+        public synchronized void start(Runnable runnable) {
+            if (this.runnable != null)
+                throw new Error("start() on a thread that is alread busy!");
+            this.runnable = runnable;
+            notify();
         }
         public void run() {
             try {
-                while (!die) {
+                while(true) {
                     try {
-                        synchronized(this) { while (task==null) wait(); }
-                        task.run();
+                        synchronized(this) { while (runnable==null) wait(); }
+                        runnable.run();
                     } catch (Exception e) { Log.error(this, e); }
-                    recycle();
+                    runnable = null;
+                    synchronized(ThreadPool.this) {
+                        /* if the idle array is full, just let ourselves die */
+                        if (numIdleThreads > minIdleThreads && numThreads > minThreads) return;
+                        /* otherwise put ourselves back in the pool and release anybody who is waiting */
+                        idleThreads[numIdleThreads++] = this;
+                        ThreadPool.this.notifyAll();
+                    }
                 }
             } finally {
                 numThreads--;