侧边栏壁纸
博主头像
一定会去到彩虹海的麦当

说什么呢?约定好的事就一定要做到啊!

  • 累计撰写 63 篇文章
  • 累计创建 16 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

[并发进阶]——线程池总结

一定会去到彩虹海的麦当
2022-06-14 / 0 评论 / 0 点赞 / 49 阅读 / 5,922 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-07-28,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

什么是线程池?

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

有哪几种创建线程池的方式?

使用 Executors 工具类创建线程池

Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 ExecutorService 接口。

主要有 newSingleThreadExecutor,newFixedThreadPool,newCachedThreadPool,newScheduledThreadPool

  • newFixedThreadPool:固定线程数的线程池。corePoolSize = maximumPoolSize,keepAliveTime为0,工作队列使用无界的

  • newSingleThreadExecutor:只有一个线程的线程池。corePoolSize = maximumPoolSize = 1,keepAliveTime为0, 工作队列使用无界的

  • newCachedThreadPool: 按需要创建新线程的线程池。核心线程数为0,最大线程数为 Integer.MAX_VALUE,keepAliveTime为60秒,工作队列使用同步移交 SynchronousQueue。该线程池可以无限扩展,当需求增加时,可以添加新的线程,而当需求降低时会自动回收空闲线程。适用于执行很多的短期异步任务,或者是负载较轻的服务器。

  • newScheduledThreadPool:创建一个以延迟或定时的方式来执行任务的线程池,工作队列为 DelayedWorkQueue。适用于需要多个后台线程执行周期任务。

Executors 各个方法的弊端:

  • newFixedThreadPool 和 newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。

  • newCachedThreadPool 和 newScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

使用 ThreadPoolExecutor 构造函数创建线程池

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写程序的同学更加明确线程池的运行规则,规避资源耗尽的风险

我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

ThreadPoolExecutor 构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数

  • corePoolSize :核心线程数,定义了最小可以同时运行的线程数。

  • maximumPoolSize :线程池中允许存在的最大工作线程数。

  • workQueue:阻塞队列的长度。当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,任务就会被存放在阻塞队列中。

ThreadPoolExecutor 其他常见参数

  1. keepAliveTime:线程池中的线程数大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit:keepAliveTime 参数的时间单位。
  3. threadFactory:创建新线程的线程工厂
  4. handler :当工作队列已满并且同时运行的线程数达到最大工作线程数时,新加入的任务就会走拒绝策略

使用ThreadPoolExecutor

// 创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
                   MAXIMUM_POOL_SIZE,
                   KEEP_ALIVE,
                   TimeUnit.SECONDS,
                   sPoolWorkQueue,
                  sThreadFactory);
// 向线程池提交任务
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        ... // 线程执行的任务
    }
});
// 关闭线程池
threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表

线程池的运作流程

img

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
  • 如果阻塞队列选择了有界队列,那么任务超过了阻塞队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

线程池中的各个状态

  • RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务。

  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。

  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。

  • TIDYING:所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。

  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个。

img

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名 高3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余 任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列 任务
TIDYING 010 任务全执行完毕,活动线程为 0 即将进入 终结
TERMINATED 011 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

img

ctl 是一个打包两个概念字段的原子整数。

1)workerCount:指示线程的有效数量;

2)runState:指示线程池的运行状态,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 等状态

ThreadPoolExecutor 拒绝策略

ThreadPoolExecutor 拒绝策略定义:

如果当工作队列已满并且同时运行的线程数达到最大工作线程数时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy(默认):抛出 RejectedExecutionException来拒绝新任务的处理。

  • ThreadPoolExecutor.CallerRunsPolicy:用调用者所在的线程来执行任务。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。

  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。

  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃最早的未处理的任务。

img

举个例子:Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 拒绝策略的话,配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy

如何终止线程池?

shutdown:“温柔”的关闭线程池。不接受新任务,但是在关闭前会将之前提交的任务处理完毕。

shutdownNow:“粗暴”的关闭线程池,也就是直接关闭线程池,通过 Thread#interrupt() 方法终止所有线程,不会等待之前提交的任务执行完毕。但是会返回队列中未处理的任务。

线程池中 submit() 和 execute() 方法有什么区别?

接收参数:execute() 只能执行 Runnable 类型的任务。submit() 可以执行 Runnable 和 Callable 类型的任务

返回值:submit() 方法可以获取异步计算结果 Future 对象,而 execute() 没有

异常处理:submit() 方便 Exception 处理

submit()使用示例

public class ExecutorsTest {
    private static final String SUCCESS = "success";
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());



        System.out.println("------------------任务开始执行---------------------");

        Future<String> future = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                System.out.println("submit方法执行任务完成" + "   thread name: " + Thread.currentThread().getName());
                return SUCCESS;
            }
        });

        try {
            String s = future.get();
            if (SUCCESS.equals(s)) {
                String name = Thread.currentThread().getName();
                System.out.println("经过返回值比较,submit方法执行任务成功    thread name: " + name);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("-------------------main thread end---------------------");
    }
}

output

------------------任务开始执行---------------------
submit方法执行任务完成   thread name: pool-1-thread-1
经过返回值比较,submit方法执行任务成功    thread name: main
-------------------main thread end---------------------

submit()返回FutureTask对象,通过这个FutureTask对象调用get()可以返回submit()方法传入的一个泛型类参数result对象,如果是Callable直接通过call()返回。这个返回值的可以用来校验任务执行是否成功。

主线程会一直阻塞,等待线程池中的任务执行完后,在执行后面的语句。

0

评论区