import java.io.*;
-/** A simple thread pool. */
+/**
+ * A thread pool.
+ *
+ * - Attempting to add a task will block until the task is assigned to
+ * a thread.
+ * - If a task has not been assigned after MINIMUM_DISPATCH_DELAY, a new
+ * thread (above and beyond the ThreadPool size) will be created to
+ * handle it.
+ * - Whenever at least minIdleThreads are idle and more than minThreads
+ * threads exist overall, any threads which complete their task will
+ * be allowed to die.
+ *
+ */
public final class ThreadPool {
- private final Queue tasks;
- private final Queue idle;
- private final int minThreads;
- private final int maxThreads;
- private int numThreads;
+ private static final int MINIMUM_DISPATCH_DELAY = 1000;
+ private static final int minIdleThreads = 3;
- public ThreadPool(int min, int max) {
- this.minThreads = min;
- this.maxThreads = max;
- this.idle = new Queue(Math.min(min, 100));
- this.tasks = new Queue(Math.min(max, 100));
- }
+ private final int minThreads;
+ private int numThreads;
+ private final PooledThread[] idleThreads;
+ private int numIdleThreads;
- public synchronized void appendTask(Runnable r) {
- if (idle.size() > 0) ((PooledThread)idle.remove()).setTask(r);
- else if (numThreads < maxThreads) new PooledThread(r);
- else tasks.append(r);
+ public ThreadPool(int minThreads) {
+ this.minThreads = minThreads;
+ this.numThreads = 0;
+ this.numIdleThreads = 0;
+ this.idleThreads = new PooledThread[minThreads];
+ for(int i=0; i<this.idleThreads.length; i++)
+ this.idleThreads[i] = new PooledThread();
}
- public synchronized void prependTask(Runnable r) {
- if (idle.size() > 0) ((PooledThread)idle.remove()).setTask(r);
- else if (numThreads < maxThreads) new PooledThread(r);
- else tasks.prepend(r);
+ public synchronized void start(Runnable r) {
+ /* if no idle threads, wait for MINIMUM_DISPATCH_DELAY or until notified */
+ if (numIdleThreads == 0) try {
+ this.wait(MINIMUM_DISPATCH_DELAY);
+ } catch (Exception e) { Log.error(this, e); }
+
+ /* if there are idle threads, use one */
+ if (numIdleThreads > 0) {
+ numIdleThreads--;
+ idleThreads[numIdleThreads].start(r);
+ idleThreads[numIdleThreads] = null;
+ return;
+ }
+
+ /* otherwise, create a new thread */
+ new PooledThread().start(r);
}
private class PooledThread extends Thread {
- private Runnable task = null;
- private boolean die = false;
- public PooledThread(Runnable r) {
- numThreads++;
- if (r!=null) setTask(r);
+ private Runnable runnable = null;
+ public PooledThread() {
+ synchronized(ThreadPool.this) {
+ if (numIdleThreads >= idleThreads.length)
+ throw new Error("this should not happen");
+ numThreads++;
+ idleThreads[numIdleThreads++] = this;
+ }
start();
}
- public synchronized boolean isRunning() { return task != null; }
- public synchronized boolean setTask(Runnable r) {
- if (task != null) throw new Error("setTask() on a thread that is alread busy!");
- task = r;
- notifyAll();
- return true;
- }
- private synchronized void recycle() {
- task = null;
- // depends on the fact that our Queue class is synchronized
- if (tasks.size() > 0) task = (Runnable)tasks.remove();
- if (task==null && idle.size() < minThreads) idle.append(this);
- else if (task==null) die = true;
+ public synchronized void start(Runnable runnable) {
+ if (this.runnable != null)
+ throw new Error("start() on a thread that is alread busy!");
+ this.runnable = runnable;
+ notify();
}
public void run() {
try {
- while (!die) {
+ while(true) {
try {
- synchronized(this) { while (task==null) wait(); }
- task.run();
+ synchronized(this) { while (runnable==null) wait(); }
+ runnable.run();
} catch (Exception e) { Log.error(this, e); }
- recycle();
+ runnable = null;
+ synchronized(ThreadPool.this) {
+ /* if the idle array is full, just let ourselves die */
+ if (numIdleThreads > minIdleThreads && numThreads > minThreads) return;
+ /* otherwise put ourselves back in the pool and release anybody who is waiting */
+ idleThreads[numIdleThreads++] = this;
+ ThreadPool.this.notifyAll();
+ }
}
} finally {
numThreads--;