logo头像

Always believe youself.

线程池复习

创建

通过Executors类提供的方法。

newCachedThreadPool

创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。

private static void createCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                // 获取线程名称,默认格式:pool-1-thread-1
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                // 等待2秒
                sleep(2000);
            });
        }
    }

newFixedThreadPool

创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。

private static void createFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                // 获取线程名称,默认格式:pool-1-thread-1
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                // 等待2秒
                sleep(2000);
            });
        }
    }

newScheduledThreadPool

创建一个周期性的线程池,支持定时及周期性执行任务。

private static void createScheduledThreadPool() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        System.out.println(DateUtil.now() + " 提交任务");
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.schedule(() -> {
                // 获取线程名称,默认格式:pool-1-thread-1
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                // 等待2秒
                sleep(2000);
            }, 3, TimeUnit.SECONDS);
        }
    }

设置了延迟3秒,所以提交后3秒才开始执行任务。因为这里设置核心线程数为3个,而线程不足会进入队列等待线程空闲,所以日志间隔2秒输出。

newSingleThreadExecutor

创建一个单线程的线程池,可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

private static void createSingleThreadPool() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                // 获取线程名称,默认格式:pool-1-thread-1
                System.out.println(DateUtil.now() + " " + Thread.currentThread().getName() + " " + index);
                // 等待2秒
                sleep(2000);
            });
        }
    }

通过ThreadPoolExecutor类自定义。

ThreadPoolExecutor类提供了4种构造方法,可根据需要来自定义一个线程池。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 省略...
    }

7个参数如下:

  • corePoolSize:核心线程数,线程池中始终存活的线程数。

  • maximumPoolSize: 最大线程数,线程池中允许的最大线程数。

  • keepAliveTime: 存活时间,线程没有任务执行时最多保持多久时间会终止。

  • unit: 单位,参数keepAliveTime的时间单位,7种可选。(TimeUnit.DAYS/天;TimeUnit.HOURS/小时;TimeUnit.MINUTES/分;TimeUnit.SECONDS/秒;TimeUnit.MILLISECONDS/毫秒;TimeUnit.MICROSECONDS/微妙;TimeUnit.NANOSECONDS/纳秒)

  • workQueue: 一个阻塞队列,用来存储等待执行的任务,均为线程安全,7种可选。

参数 描述
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
SynchronousQueue 一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
DelayQueue 一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

较常用的是LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory: 线程工厂,主要用来创建线程,默及正常优先级、非守护线程。

  • handler:拒绝策略,拒绝处理任务时的策略,4种可选,默认为AbortPolicy。

参数 描述
AbortPolicy 拒绝并抛出异常。
CallerRunsPolicy 重试提交当前的任务,即再次调用运行该任务的execute()方法。
DiscardOldestPolicy 抛弃队列头部(最旧)的一个任务,并执行当前任务。
DiscardPolicy 抛弃当前任务。
  • 任务队列的策略

    • 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    • 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    • 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

执行顺序

  • 当线程数小于核心线程数时,创建线程。

  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

  • 当线程数大于等于核心线程数,且任务队列已满:

    • 若线程数小于最大线程数,创建线程。

    • 若线程数等于最大线程数,抛出异常,拒绝任务。

优略比较

说是5种方式的比较,其实就是2种方式的比较,为什么这么说?因为Executors类提供的4种方式,其底层其实都是通过ThreadPoolExecutor类来实现的。换句话说,就是Executors类工厂通过参数的组合,组装出了上面提到的4种类型线程池供不同场景使用。我们可以通过查看Executors类的源码来看看:

  • newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    } 
    

因为SynchronousQueue队列不保持它们,直接提交给线程,相当于队列大小为0,而最大线程数为Integer.MAX_VALUE,所以线程不足时,会一直创建新线程,等到线程空闲时,又有60秒存活时间,从而实现了一个可缓存的线程池。

  • newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
    

    }

因为核心线程数与最大线程数相同,所以线程池的线程数是固定的,而且没有限制队列的大小,所以多余的任务均会被放到队列排队,从而实现一个固定大小,可控制并发数量的线程池。

  • newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    

  因为使用了延迟队列,只有在延迟期满时才能从中提取到元素,从而实现定时执行的线程池。而周期性执行是配合上层封装的其他类来实现的,可以看ScheduledExecutorService类的scheduleAtFixedRate方法。

  • newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

因为核心线程数与最大线程数相同,均为1,所以线程池的线程数是固定的1个,而且没有限制队列的大小,所以多余的任务均会被放到队列排队,从而实现一个单线程按指定顺序执行的线程池。

建议

虽然看上去Executors类的封装,可以简化我们的使用,但事实上,阿里代码规范《阿里巴巴Java开发手册》中明确不建议使用Executors类提供的这4种方法:

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors返回的线程池对象的弊端如下:

FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

应该使用ThreadPoolExecutor类来创建线程池,根据自己需要的场景来创建一个合适的线程池。

线程池的三种队列区别

SynchronousQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));
  • SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

  • 拥有公平(FIFO)和非公平(LIFO)策略,非公平侧罗会导致一些数据永远无法被消费的情况?

  • 使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

LinkedBlockingQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));

LinkedBlockingQueue是一个无界缓存等待队列。

当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。

生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是int类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE,等同于如下:

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(Integer.MAX_VALUE), r -> new Thread(r, "ThreadTest"));

ArrayBlockingQueue

private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会执行拒绝策略。