本文接上文,继续学习线程池内部结构,以及线程池中最常见的阻塞队列类型。

线程池内部结构

四部分组成如下:

  1. 线程池管理器,负责线程池的创建、销毁、添加任务等管理操作
  2. 工作线程
  3. 任务队列,作为缓冲机制
  4. 任务,任务要求实现统一的接口

阻塞队列


如图所示,五种线程池对应3种阻塞队列。

LinkedBlockingQueue

对于FixedThreadPoolSingleThreadExector,使用的是容量为Integer.MAX_VALUE的LinkedBlockingQueue,可以认为是无界队列。

SynchronousQueue

对应的线程池是CacheThreadPool,这也是一个可以无限扩展的队列。但是cacheThreadPool的线程数是可以一直增加的,所以对于任务,此线程池是来者不拒。因此SynchronousQueue值应当大一点。

DelayedWorkQueue

对应于ScheduledThreadPool和SingleThreadScheduledExecutor,特点是可以延迟任务。
DelayedWorkQueue的特点是内部元素不是按照放入时间排序,而是按照延迟时间长短来排序,内部使用的是

ForkJoinPool使用的内部类WorkQueue,且fork出来的子任务有上限1<<26

不要使用Executors.new方法来创建线程池。

FixedThreadPool

这是线程数目固定的线程池。

public static ExecutorService newFixedThreadPool(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

内部使用的是核心线程数等于最大线程数的ThreadPoolExecutor。队列使用的是无边界的LinkedBlockingQueue。所以任务请求多,队列会一直堆下去,导致OOM。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() { 
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

把核心线程数和最大线程数都设置为1,任务队列还是无边界的LinkBlockingQueue,也会造成同样问题。

CachedThreadPool

public static ExecutorService newCachedThreadPool() { 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

虽然使用的SynchronousQueue,但是最大线程数被设置为Integer.MAX_VALUE,会导致线程一直创建,超出OS的限制。

ScheduleThreadPool和SingleThreadScheduledExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

其中ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,它的构造方法如下

public ScheduledThreadPoolExecutor(int corePoolSize) { 
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}

采用的任务队列是DelayedWorkQueue,还是一个无界队列。

到这里可以看出,通过Executors.new的方法创建出来的线程池,要么没有限制最大线程数,要么没有限制阻塞队列的大小,最终会导致OOM

合适的线程数量是多少

CPU密集型任务

比如加密、解密、压缩、计算等需要耗费CPU资源的任务。这样的任务一般设置线程数为CPU核心数的1~2倍。虽然设置了很多线程,但是CPU负荷满了,会导致上下文切换麻烦。

耗时IO型任务

比如数据库、文件读写、网络通信,特点是不耗CPU资源,但是IO耗时多,这种线程数一般设置CPU核心数很多倍。

《Java并发编程实战》 作者: Brain Geotz

提供一个计算方法:


线程数=CPU核心数*(1+平均等待时间/平均工作时间)


结论

  • 平均工作时间占比越高,越少线程
  • 平均等待时间占比越高,线程越多

定制自己的线程池

核心线程数

corePoolSize参考上面内容设置

阻塞队列

ArrayBlockingQueue,这是使用数组实现的队列,不可扩容

线程工厂

可以使用默认的defaultThreadFactory,一般我们会自定义名字,此时可以使用com.google.common.util.concurrent.ThreadFactory来创建工厂

ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory rpcFactory = builder.setNameFormat("rpc-pool-%d").build();

为我们生成了名字为"rpc-pool-%d"格式的线程。

拒绝策略

配合上一篇文章,使用默认的四个,或者自定义。

private static class CustomRejectionHandler implements RejectedExecutionHandler { 
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
        //打印日志、暂存任务、重新执行等拒绝策略
    } 
}

只需实现RejectedExecutionHandler 接口即可。

如何正确关闭线程池

ExecutorService service = Executors.newFixedThreadPool(10);
 for (int i = 0; i < 100; i++) { 
    service.execute(new Task());
 }

ThreadPoolExecutor中提供5中方法:

  • void shutdown()
  • boolean isShutdown()
  • boolean isTerminated()
  • boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException
  • List<Runnable>shutdownNow()

shutdown()

可以安全的关闭一个线程池,调用shutdown方法后,线程池中还在执行任务的线程和队列中等待的任务都会被执行完才彻底关闭。

调用shutdown方法后,再提交任务会被拒绝策略拒绝。

isShutdown()

只是来判断线程是否开始进行关闭工作,但返回true不代表线程池彻底关闭了。

isTerminated()

用来判断线程池是否真正的终结。

awaitTermination()

用来判断线程池状态的。比如传入参数10s,那么它会陷入10s等待,直到发生下面三种情况之一:

  1. 等待期间,线程池完成所有提交任务并关闭,返回true
  2. 等待超时,返回false
  3. 等待期间被中断,抛出InterruptedException
    可以根据awaitTermination方法的返回值来判断下一步该干什么

shutdownNow()

与第一种明显区别就是Now,它会给线程池中所有线程发送interrupt信号,尝试中断,然后将任务队列中所有等待的任务转移到list中返回。

public List<Runnable> shutdownNow() { 
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    
    try { 
        checkShutdownAccess();
        advanceRunState(STOP);
        //让每一个已经启动的线程中断
        interruptWorkers();
        tasks = drainQueue();
    } finally { 
        mainLock.unlock();
    } 
 
    tryTerminate();
    return tasks;
 }

线程池复用的原理

原理

线程和任务解耦,摆脱之前一个线程一个任务的限制。核心是对Thread进行改装,不是每次执行Thread.start()来创建新线程,而是让每个线程去执行一个循环操作。这个循环操作不停去检查是否还有任务在等待,是则去调用任务的run方法。
创建新线程的时机

线程池会逐一判断corePoolSize、workQueue、maxPoolSize。


从execute开始分析

public void execute(Runnable command) { 
    if (command == null) 
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true)) 
            return;
        c = ctl.get();
    } 
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command)) 
            reject(command);
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false);
    } 
    else if (!addWorker(command, false)) 
        reject(command);
}

代码短小精悍。

代码解析

//如果传入的Runnable的空,就抛出异常
if (command == null) 
    throw new NullPointerException();

先判断提交的任务是否为null,接着判断当前线程数是否小于corePoolSize,小则调用addWorked方法增加一个。

if (workerCountOf(c) < corePoolSize) { 
    if (addWorker(command, true)) 
        return;
        c = ctl.get();
}

addWorker有两参数,一是刚提交的任务,二是布尔值。true代表增加线程时判断当前线程是否小于corePoolSize,小则加,大则不加。false则判断当前线程是否小于maxPoolSize。因此这里的布尔值是决定是以核心线程数还是最大线程数为界限进行判断。
下面的代码为:

if (isRunning(c) && workQueue.offer(command)) { 
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) 
        reject(command);
    else if (workerCountOf(recheck) == 0) 
        addWorker(null, false);
}

到这里说明当前线程数大于或等于线程核心数|addWorker失败了,那么需要通过if (isRunning(c) && workQueue.offer(command))判断线程池状态是否为running,是running就放到任务队列中,也就是workQueue.offer(command)。不处于running状态就移除任务并执行拒绝策略(reject(command))。看上面最后一个分支,进入这里说明是running状态,为防止之前有线程以外退出,没有线程在执行,因此执行检查工作线程是否为0的情况,是就addWorker 创建新线程。

最后一段代码是

else if (!addWorker(command, false)) 
        reject(command);

到这里说明线程不是running状态或者线程数大于核心线程数且队列已满,此时需要添加新线程直到达到maxPoolSize,所以此处会再次调用addWorker并传入false。失败则说明线程数达到了maxPoolSize,此时执行拒绝策略。

addWorker

addWorker是这段代码的核心调用方法,它可以理解为对Thread的包装,addWorker方法会添加并启动Worker。Worker内部有一个Thread对象,它正是最终真正执行任务的线程,所以一个Worker对应线程池中的一个线程

runWorker(Worker w) {
    Runnable task = w.firstTask;
    while (task != null || (task = getTask()) != null) {
        try {
            task.run();
        } finally {
            task = null;
        }
    }
}

它用while循环不停的从workQueue中获取任务,并用task的run方法执行具体任务。

Last modification:April 10th, 2020 at 12:40 am