A tunable, extensible thread pool class. The main supported public
method is
execute(Runnable command)
, which can be
called instead of directly creating threads to execute commands.
Thread pools can be useful for several, usually intertwined
reasons:
- To bound resource use. A limit can be placed on the maximum
number of simultaneously executing threads.
- To manage concurrency levels. A targeted number of threads
can be allowed to execute simultaneously.
- To manage a set of threads performing related tasks.
- To minimize overhead, by reusing previously constructed
Thread objects rather than creating new ones. (Note however
that pools are hardly ever cure-alls for performance problems
associated with thread construction, especially on JVMs that
themselves internally pool or recycle threads.)
These goals introduce a number of policy parameters that are
encapsulated in this class. All of these parameters have defaults
and are tunable, either via get/set methods, or, in cases where
decisions should hold across lifetimes, via methods that can be
easily overridden in subclasses. The main, most commonly set
parameters can be established in constructors. Policy choices
across these dimensions can and do interact. Be careful, and
please read this documentation completely before using! See also
the usage examples below.
execute
Integer.MAX_VALUE
- negative
execute
execute
Other plausible policies include raising the maximum pool size
after checking with some other objects that this is OK.
These cases can never occur if the maximum pool size is unbounded
or the queue is unbounded. In these cases you instead face
potential resource exhaustion.) The execute method does not
throw any checked exceptions in any of these cases since any
errors associated with them must normally be dealt with via
handlers or callbacks. (Although in some cases, these might be
associated with throwing unchecked exceptions.) You may wish to
add special implementations even if you choose one of the listed
policies. For example, the supplied Discard policy does not
inform the caller of the drop. You could add your own version
that does so. Since choice of policies is normally a system-wide
decision, selecting a policy affects all calls to
execute
. If for some reason you would instead like
to make per-call decisions, you could add variant versions of the
execute
method (for example,
executeIfWouldNotBlock
) in subclasses.
Usage examples.
class MyPool {
// initialize to use a maximum of 8 threads.
static PooledExecutor pool = new PooledExecutor(8);
}
- Using a bounded buffer of 10 tasks, at least 4 threads (started only
when needed due to incoming requests), but allowing
up to 100 threads if the buffer gets full.
pool = new PooledExecutor(new BoundedBuffer(10), 100);
pool.setMinimumPoolSize(4);
- Same as (1), except pre-start 9 threads, allowing them to
die if they are not used for five minutes.
pool = new PooledExecutor(new BoundedBuffer(10), 100);
pool.setMinimumPoolSize(4);
pool.setKeepAliveTime(1000 * 60 * 5);
pool.createThreads(9);
- Same as (2) except clients block if both the buffer is full and
all 100 threads are busy:
pool = new PooledExecutor(new BoundedBuffer(10), 100);
pool.setMinimumPoolSize(4);
pool.setKeepAliveTime(1000 * 60 * 5);
pool.waitWhenBlocked();
pool.createThreads(9);
- An unbounded queue serviced by exactly 5 threads:
pool = new PooledExecutor(new LinkedQueue());
pool.setKeepAliveTime(-1); // live forever
pool.createThreads(5);
Usage notes. Introduction to this package. DEFAULT_KEEPALIVETIME
public static final long DEFAULT_KEEPALIVETIME
The maximum time to keep worker threads alive waiting for new
tasks; used if not otherwise specified. Default value is one
minute (60000 milliseconds).
DEFAULT_MAXIMUMPOOLSIZE
public static final int DEFAULT_MAXIMUMPOOLSIZE
The maximum pool size; used if not otherwise specified. Default
value is essentially infinite (Integer.MAX_VALUE)
DEFAULT_MINIMUMPOOLSIZE
public static final int DEFAULT_MINIMUMPOOLSIZE
The minimum pool size; used if not otherwise specified. Default
value is 1.
handOff_
protected final Channel handOff_
The channel used to hand off the command to a thread in the pool.
keepAliveTime_
protected long keepAliveTime_
The maximum time for an idle thread to wait for new task. *
maximumPoolSize_
protected int maximumPoolSize_
The maximum number of threads allowed in pool. *
minimumPoolSize_
protected int minimumPoolSize_
The minumum number of threads to maintain in pool. *
poolSize_
protected int poolSize_
Current pool size. *
shutdown_
protected boolean shutdown_
Shutdown flag - latches true when a shutdown method is called
in order to disable queuing/handoffs of new tasks.
threads_
protected final Map threads_
The set of active threads, declared as a map from workers to
their threads. This is needed by the interruptAll method. It
may also be useful in subclasses that need to perform other
thread management chores.
abortWhenBlocked
public void abortWhenBlocked()
Set the policy for blocked execution to be to
throw a RuntimeException.
addThread
protected void addThread(Runnable command)
Create and start a thread to handle a new command. Call only
when holding lock.
awaitTerminationAfterShutdown
public void awaitTerminationAfterShutdown()
throws InterruptedException
Wait for a shutdown pool to fully terminate. This method may
only be called after invoking shutdownNow or
shutdownAfterProcessingCurrentlyQueuedTasks.
awaitTerminationAfterShutdown
public boolean awaitTerminationAfterShutdown(long maxWaitTime)
throws InterruptedException
Wait for a shutdown pool to fully terminate, or until the timeout
has expired. This method may only be called after
invoking shutdownNow or
shutdownAfterProcessingCurrentlyQueuedTasks.
maxWaitTime
- the maximum time in milliseconds to wait
- true if the pool has terminated within the max wait period
createThreads
public int createThreads(int numberOfThreads)
Create and start up to numberOfThreads threads in the pool.
Return the number created. This may be less than the number
requested if creating more would exceed maximum pool size bound.
discardOldestWhenBlocked
public void discardOldestWhenBlocked()
Set the policy for blocked execution to be to discard the oldest
unhandled request
discardWhenBlocked
public void discardWhenBlocked()
Set the policy for blocked execution to be to return without
executing the request.
drain
public List drain()
Remove all unprocessed tasks from pool queue, and return them in
a java.util.List. Thsi method should be used only when there are
not any active clients of the pool. Otherwise you face the
possibility that the method will loop pulling out tasks as
clients are putting them in. This method can be useful after
shutting down a pool (via shutdownNow) to determine whether there
are any pending tasks that were not processed. You can then, for
example execute all unprocessed commands via code along the lines
of:
List tasks = pool.drain();
for (Iterator it = tasks.iterator(); it.hasNext();)
( (Runnable)(it.next()) ).run();
execute
public void execute(Runnable command)
throws InterruptedException
Arrange for the given command to be executed by a thread in this
pool. The method normally returns when the command has been
handed off for (possibly later) execution.
- execute in interface Executor
getKeepAliveTime
public long getKeepAliveTime()
Return the number of milliseconds to keep threads alive waiting
for new commands. A negative value means to wait forever. A zero
value means not to wait at all.
getMaximumPoolSize
public int getMaximumPoolSize()
Return the maximum number of threads to simultaneously execute
New unqueued requests will be handled according to the current
blocking policy once this limit is exceeded.
getMinimumPoolSize
public int getMinimumPoolSize()
Return the minimum number of threads to simultaneously execute.
(Default value is 1). If fewer than the mininum number are
running upon reception of a new request, a new thread is started
to handle this request.
getPoolSize
public int getPoolSize()
Return the current number of active threads in the pool. This
number is just a snaphot, and may change immediately upon
returning
getTask
protected Runnable getTask()
throws InterruptedException
Get a task from the handoff queue, or null if shutting down.
interruptAll
public void interruptAll()
Interrupt all threads in the pool, causing them all to
terminate. Assuming that executed tasks do not disable (clear)
interruptions, each thread will terminate after processing its
current task. Threads will terminate sooner if the executed tasks
themselves respond to interrupts.
isTerminatedAfterShutdown
public boolean isTerminatedAfterShutdown()
Return true if a shutDown method has succeeded in terminating all
threads.
runWhenBlocked
public void runWhenBlocked()
Set the policy for blocked execution to be that the current
thread executes the command if there are no available threads in
the pool.
setKeepAliveTime
public void setKeepAliveTime(long msecs)
Set the number of milliseconds to keep threads alive waiting for
new commands. A negative value means to wait forever. A zero
value means not to wait at all.
setMaximumPoolSize
public void setMaximumPoolSize(int newMaximum)
Set the maximum number of threads to use. Decreasing the pool
size will not immediately kill existing threads, but they may
later die when idle.
setMinimumPoolSize
public void setMinimumPoolSize(int newMinimum)
Set the minimum number of threads to use.
shutdownAfterProcessingCurrentlyQueuedTasks
public void shutdownAfterProcessingCurrentlyQueuedTasks()
Terminate threads after processing all elements currently in
queue. Any tasks entered after this point will be discarded. A
shut down pool cannot be restarted.
shutdownAfterProcessingCurrentlyQueuedTasks
public void shutdownAfterProcessingCurrentlyQueuedTasks(PooledExecutor.BlockedExecutionHandler handler)
Terminate threads after processing all elements currently in
queue. Any tasks entered after this point will be handled by the
given BlockedExecutionHandler. A shut down pool cannot be
restarted.
shutdownNow
public void shutdownNow()
Interrupt all threads and disable construction of new
threads. Any tasks entered after this point will be discarded. A
shut down pool cannot be restarted.
shutdownNow
public void shutdownNow(PooledExecutor.BlockedExecutionHandler handler)
Interrupt all threads and disable construction of new
threads. Any tasks entered after this point will be handled by
the given BlockedExecutionHandler. A shut down pool cannot be
restarted.
waitWhenBlocked
public void waitWhenBlocked()
Set the policy for blocked execution to be to wait until a thread
is available.
workerDone
protected void workerDone(PooledExecutor.Worker w)
Cleanup method called upon termination of worker thread.