线程池要点
所属分类 java
浏览量 1432
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
static class DefaultThreadFactory implements ThreadFactory
corePoolSize: 核心线程数
maximumPoolSize:最大线程数
keepAliveTime: 线程空闲时间
unit: 空闲时间单位
workQueue:任务缓冲队列
handler: 任务拒绝策略
threadFactory 线程工厂 可指定线程名前缀等
newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool maxSize=Integer.MAX_VALUE + SynchronousQueue
newSingleThreadScheduledExecutor
newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
注意 work 与 worker 的区别
work 工作 任务
worker 工人 工作者
BlockingQueue workQueue
HashSet workers = new HashSet();
核心线程是否允许被回收,默认 false
private volatile boolean allowCoreThreadTimeOut;
If false (default), core threads stay alive even when idle.
If true, core threads use keepAliveTime to time out waiting for work.
几种任务队列
SynchronousQueue
LinkedBlockingQueue 基于链表 默认容量 Integer.MAX_VALUE
ArrayBlockingQueue 基于数组 有界 可防止资源耗尽
PriorityBlockingQueue 优先级队列
DelayedWorkQueue
Executors.newScheduledThreadPool 使用DelayedWorkQueue创建线程池。
4种拒绝策略
AbortPolicy
DiscardPolicy
DiscardOldestPolicy
CallerRunsPolicy
If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.
线程数小于 corePoolSize 创建线程并运行任务
线程数 大于 corePoolSize ,把任务添加到任务队列
任务队列满了之后 ,创建 新线程 直到 maximumPoolSize
执行拒绝策略
两个重要状态
runState and workerCount
double-check
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl 原子变量 同时存放了 runState and workerCount
The main pool control state, ctl, is an atomic integer packing two conceptual fields
workerCount, indicating the effective number of threads
runState, indicating whether running, shutting down etc
In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads
The runState provides the main lifecycle control, taking on values:
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed
shutdown() vs shutdownNow()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
workQueue.offer(command)
execute 方法分析
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上一篇
下一篇
lua动态方法调用实例
aerospike中使用lua注意点
指数基金的几种形式
员工离职前的9种征兆
ThreadPoolExecutor中的ctl变量
2019年一季度A股投资者结构