首页  

线程池要点     所属分类 java 浏览量 1397
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
                              
                              
static class DefaultThreadFactory implements ThreadFactory
 
corePoolSize: 核心线程数
maximumPoolSize:最大线程数
keepAliveTime: 线程空闲时间
unit: 空闲时间单位
workQueue:任务缓冲队列
handler: 任务拒绝策略
threadFactory 线程工厂 可指定线程名前缀等


newFixedThreadPool
newSingleThreadExecutor
newCachedThreadPool     maxSize=Integer.MAX_VALUE  + SynchronousQueue
newSingleThreadScheduledExecutor
newScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }


private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
     
     
注意 work 与 worker 的区别
work  工作 任务 
worker  工人 工作者 

BlockingQueue workQueue

HashSet workers = new HashSet();


核心线程是否允许被回收,默认 false 
private volatile boolean allowCoreThreadTimeOut;
If false (default), core threads stay alive even when idle.
If true, core threads use keepAliveTime to time out waiting for work.


几种任务队列
SynchronousQueue
LinkedBlockingQueue  基于链表 默认容量 Integer.MAX_VALUE
ArrayBlockingQueue   基于数组 有界 可防止资源耗尽 
PriorityBlockingQueue 优先级队列

DelayedWorkQueue  
Executors.newScheduledThreadPool 使用DelayedWorkQueue创建线程池。


4种拒绝策略
AbortPolicy
DiscardPolicy
DiscardOldestPolicy
CallerRunsPolicy




If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.
线程数小于 corePoolSize 创建线程并运行任务

线程数 大于 corePoolSize ,把任务添加到任务队列

任务队列满了之后 ,创建 新线程 直到 maximumPoolSize

执行拒绝策略


两个重要状态 
runState and workerCount

double-check

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }


ctl 原子变量 同时存放了 runState and workerCount


The main pool control state, ctl, is an atomic integer packing two conceptual fields
workerCount, indicating the effective number of threads
runState,    indicating whether running, shutting down etc
     
In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads 
     
The runState provides the main lifecycle control, taking on values:
     
RUNNING:  Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP:     Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
TIDYING:  All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING  will run the terminated() hook method
TERMINATED: terminated() has completed
     
     
shutdown()  vs   shutdownNow()       


    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }


    public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }     
            
workQueue.offer(command)




execute 方法分析



  /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

上一篇     下一篇
lua动态方法调用实例

aerospike中使用lua注意点

指数基金的几种形式

员工离职前的9种征兆

ThreadPoolExecutor中的ctl变量

2019年一季度A股投资者结构