From: adam Date: Sun, 8 Jul 2007 22:52:20 +0000 (+0000) Subject: improve ThreadPool X-Git-Url: http://git.megacz.com/?p=org.ibex.util.git;a=commitdiff_plain;h=55f0d8b05ba8b6ef036f31eb899813d8eaa4bc5a improve ThreadPool darcs-hash:20070708225220-5007d-02566049d8f1813b8bd18e27a5e73fb17e34cf5e.gz --- diff --git a/src/org/ibex/util/ThreadPool.java b/src/org/ibex/util/ThreadPool.java index 30e490f..16eeb83 100644 --- a/src/org/ibex/util/ThreadPool.java +++ b/src/org/ibex/util/ThreadPool.java @@ -6,64 +6,88 @@ package org.ibex.util; import java.io.*; -/** A simple thread pool. */ +/** + * A thread pool. + * + * - Attempting to add a task will block until the task is assigned to + * a thread. + * - If a task has not been assigned after MINIMUM_DISPATCH_DELAY, a new + * thread (above and beyond the ThreadPool size) will be created to + * handle it. + * - Whenever at least minIdleThreads are idle and more than minThreads + * threads exist overall, any threads which complete their task will + * be allowed to die. + * + */ public final class ThreadPool { - private final Queue tasks; - private final Queue idle; - private final int minThreads; - private final int maxThreads; - private int numThreads; + private static final int MINIMUM_DISPATCH_DELAY = 1000; + private static final int minIdleThreads = 3; - public ThreadPool(int min, int max) { - this.minThreads = min; - this.maxThreads = max; - this.idle = new Queue(Math.min(min, 100)); - this.tasks = new Queue(Math.min(max, 100)); - } + private final int minThreads; + private int numThreads; + private final PooledThread[] idleThreads; + private int numIdleThreads; - public synchronized void appendTask(Runnable r) { - if (idle.size() > 0) ((PooledThread)idle.remove()).setTask(r); - else if (numThreads < maxThreads) new PooledThread(r); - else tasks.append(r); + public ThreadPool(int minThreads) { + this.minThreads = minThreads; + this.numThreads = 0; + this.numIdleThreads = 0; + this.idleThreads = new PooledThread[minThreads]; + for(int i=0; i 0) ((PooledThread)idle.remove()).setTask(r); - else if (numThreads < maxThreads) new PooledThread(r); - else tasks.prepend(r); + public synchronized void start(Runnable r) { + /* if no idle threads, wait for MINIMUM_DISPATCH_DELAY or until notified */ + if (numIdleThreads == 0) try { + this.wait(MINIMUM_DISPATCH_DELAY); + } catch (Exception e) { Log.error(this, e); } + + /* if there are idle threads, use one */ + if (numIdleThreads > 0) { + numIdleThreads--; + idleThreads[numIdleThreads].start(r); + idleThreads[numIdleThreads] = null; + return; + } + + /* otherwise, create a new thread */ + new PooledThread().start(r); } private class PooledThread extends Thread { - private Runnable task = null; - private boolean die = false; - public PooledThread(Runnable r) { - numThreads++; - if (r!=null) setTask(r); + private Runnable runnable = null; + public PooledThread() { + synchronized(ThreadPool.this) { + if (numIdleThreads >= idleThreads.length) + throw new Error("this should not happen"); + numThreads++; + idleThreads[numIdleThreads++] = this; + } start(); } - public synchronized boolean isRunning() { return task != null; } - public synchronized boolean setTask(Runnable r) { - if (task != null) throw new Error("setTask() on a thread that is alread busy!"); - task = r; - notifyAll(); - return true; - } - private synchronized void recycle() { - task = null; - // depends on the fact that our Queue class is synchronized - if (tasks.size() > 0) task = (Runnable)tasks.remove(); - if (task==null && idle.size() < minThreads) idle.append(this); - else if (task==null) die = true; + public synchronized void start(Runnable runnable) { + if (this.runnable != null) + throw new Error("start() on a thread that is alread busy!"); + this.runnable = runnable; + notify(); } public void run() { try { - while (!die) { + while(true) { try { - synchronized(this) { while (task==null) wait(); } - task.run(); + synchronized(this) { while (runnable==null) wait(); } + runnable.run(); } catch (Exception e) { Log.error(this, e); } - recycle(); + runnable = null; + synchronized(ThreadPool.this) { + /* if the idle array is full, just let ourselves die */ + if (numIdleThreads > minIdleThreads && numThreads > minThreads) return; + /* otherwise put ourselves back in the pool and release anybody who is waiting */ + idleThreads[numIdleThreads++] = this; + ThreadPool.this.notifyAll(); + } } } finally { numThreads--;