导航:首页 > 源码编译 > 线程池源码分析

线程池源码分析

发布时间:2023-10-10 09:37:30

Ⅰ 合理使用线程池以及线程变量

背景

随着计算技术的不断发展,3纳米制程芯片已进入试产阶段,摩尔定律在现有工艺下逐渐面临巨大的物理瓶颈,通过多核处理器技术来提升服务器的性能成为提升算力的主要方向。

在服务器领域,基于java构建的后端服务器占据着领先地位,因此,掌握java并发编程技术,充分利用CPU的并发处理能力是一个开发人员必修的基本功,本文结合线程池源码和实践,简要介绍了线程池和线程变量的使用。

线程池概述

线程池是一种“池化”的线程使用模式,通过创建一定数量的线程,让这些线程处于就绪状态来提高系统响应速度,在线程使用完成后归还到线程池来达到重复利用的目标,从而降低系统资源的消耗。

总体来说,线程池有如下的优势:

线程池的使用

在java中,线程池的实现类是ThreadPoolExecutor,构造函数如下:

可以通过 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,handler)来创建一个线程池。

在构造函数中,corePoolSize为线程池核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程超时也会回收。

在构造函数中,maximumPoolSize为线程池所能容纳的最首高模大线程数。

在构造函数中,keepAliveTime表示线程闲置超时时长。如果线程闲置时间超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。

在构造函数中,timeUnit表示线程闲置超时时长的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

在构造函数中,blockingQueue表示任务队列,线程池任务队列的常用实现类有:

在构造函数中,threadFactory表示线程工厂。用于指定为线程池创建新线程的方式,threadFactory可以设置线程名称、线程组、优先级等参数。如通过Google工具包可以设置线程池里的线程名:

在构造函数中,rejectedExecutionHandler表示拒绝策略。当达到最大线程数且队列任务已满时需要执行的拒绝策略,常见的拒绝策略如下:


ThreadPoolExecutor线程池有如下几种状态:



线程池提交一个任务时任务调度的主要步骤如下:

核心代码如下:


Tomcat 的整体架构包含连接器和容器两大部分,其中连接器负责与外部通信,容器负责内部逻辑处理。在连接器中:

Tomcat为了实现请者缓求的快念铅速响应,使用线程池来提高请求的处理能力。下面我们以HTTP非阻塞I/O为例对Tomcat线程池进行简要的分析。

在Tomcat中,通过AbstractEndpoint类提供底层的网络I/O的处理,若用户没有配置自定义公共线程池,则AbstractEndpoint通过createExecutor方法来创建Tomcat默认线程池。

核心部分代码如下:

其中,TaskQueue、ThreadPoolExecutor分别为Tomcat自定义任务队列、线程池实现。

Tomcat自定义线程池继承于java.util.concurrent.ThreadPoolExecutor,并新增了一些成员变量来更高效地统计已经提交但尚未完成的任务数量(submittedCount),包括已经在队列中的任务和已经交给工作线程但还未开始执行的任务。

Tomcat在自定义线程池ThreadPoolExecutor中重写了execute()方法,并实现对提交执行的任务进行submittedCount加一。Tomcat在自定义ThreadPoolExecutor中,当线程池抛出RejectedExecutionException异常后,会调用force()方法再次向TaskQueue中进行添加任务的尝试。如果添加失败,则submittedCount减一后,再抛出RejectedExecutionException。

在Tomcat中重新定义了一个阻塞队列TaskQueue,它继承于LinkedBlockingQueue。在Tomcat中,核心线程数默认值为10,最大线程数默认为200, 为了避免线程到达核心线程数后后续任务放入队列等待,Tomcat通过自定义任务队列TaskQueue重写offer方法实现了核心线程池数达到配置数后线程的创建。

具体地,从线程池任务调度机制实现可知,当offer方法返回false时,线程池将尝试创建新新线程,从而实现任务的快速响应。TaskQueue核心实现代码如下:

Tomcat中通过自定义任务线程TaskThread实现对每个线程创建时间的记录;使用静态内部类WrappingRunnable对Runnable进行包装,用于对StopPooledThreadException异常类型的处理。

Executors常用方法有以下几个:

Executors类看起来功能比较强大、用起来还比较方便,但存在如下弊端

使用线程时,可以直接调用 ThreadPoolExecutor 的构造函数来创建线程池,并根据业务实际场景来设置corePoolSize、blockingQueue、RejectedExecuteHandler等参数。

使用局部线程池时,若任务执行完后没有执行shutdown()方法或有其他不当引用,极易造成系统资源耗尽。

在工程实践中,通常使用下述公式来计算核心线程数:

nThreads=(w+c)/c*n*u=(w/c+1)*n*u

其中,w为等待时间,c为计算时间,n为CPU核心数(通常可通过 Runtime.getRuntime().availableProcessors()方法获取),u为CPU目标利用率(取值区间为[0, 1]);在最大化CPU利用率的情况下,当处理的任务为计算密集型任务时,即等待时间w为0,此时核心线程数等于CPU核心数。

上述计算公式是理想情况下的建议核心线程数,而不同系统/应用在运行不同的任务时可能会有一定的差异,因此最佳线程数参数还需要根据任务的实际运行情况和压测表现进行微调。

为了更好地发现、分析和解决问题,建议在使用多线程时增加对异常的处理,异常处理通常有下述方案:

为了实现优雅停机的目标,我们应当先调用shutdown方法,调用这个方法也就意味着,这个线程池不会再接收任何新的任务,但是已经提交的任务还会继续执行。之后我们还应当调用awaitTermination方法,这个方法可以设定线程池在关闭之前的最大超时时间,如果在超时时间结束之前线程池能够正常关闭则会返回true,否则,超时会返回false。通常我们需要根据业务场景预估一个合理的超时时间,然后调用该方法。

如果awaitTermination方法返回false,但又希望尽可能在线程池关闭之后再做其他资源回收工作,可以考虑再调用一下shutdownNow方法,此时队列中所有尚未被处理的任务都会被丢弃,同时会设置线程池中每个线程的中断标志位。shutdownNow并不保证一定可以让正在运行的线程停止工作,除非提交给线程的任务能够正确响应中断。

ThreadLocal线程变量概述

ThreadLocal类提供了线程本地变量(thread-local variables),这些变量不同于普通的变量,访问线程本地变量的每个线程(通过其get或set方法)都有其自己的独立初始化的变量副本,因此ThreadLocal没有多线程竞争的问题,不需要单独进行加锁。

ThreadLocal的原理与实践

对于ThreadLocal而言,常用的方法有get/set/initialValue 3个方法。

众所周知,在java中SimpleDateFormat有线程安全问题,为了安全地使用SimpleDateFormat,除了1)创建SimpleDateFormat局部变量;和2)加同步锁 两种方案外,我们还可以使用3)ThreadLocal的方案:

Thread 内部维护了一个 ThreadLocal.ThreadLocalMap 实例(threadLocals),ThreadLocal 的操作都是围绕着 threadLocals 来操作的。

从JDK源码可见,ThreadLocalMap中的Entry是弱引用类型的,这就意味着如果这个ThreadLocal只被这个Entry引用,而没有被其他对象强引用时,就会在下一次GC的时候回收掉。

EagleEye(鹰眼)作为全链路监控系统在集团内部被广泛使用,traceId、rpcId、压测标等信息存储在EagleEye的ThreadLocal变量中,并在HSF/Dubbo服务调用间进行传递。EagleEye通过Filter将数据初始化到ThreadLocal中,部分相关代码如下:

在EagleEyeFilter中,通过EagleEyeRequestTracer.startTrace方法进行初始化,在前置入参转换后,通过startTrace重载方法将鹰眼上下文参数存入ThreadLocal中,相关代码如下:

EagleEyeFilter在finally代码块中,通过EagleEyeRequestTracer.endTrace方法结束调用链,通过clear方法将ThreadLocal中的数据进行清理,相关代码实现如下:

在某权益领取原有链路中,通过app打开一级页面后才能发起权益领取请求,请求经过淘系无线网关(Mtop)后到达服务端,服务端通过mtop sdk获取当前会话信息。

在XX项目中,对权益领取链路进行了升级改造,在一级页面请求时,通过服务端同时发起权益领取请求。具体地,服务端在处理一级页面请求时,同时通过调用hsf/bbo接口来进行权益领取,因此在发起rpc调用时需要携带用户当前会话信息,在服务提供端将会话信息进行提取并注入到mtop上下文,从而才能通过mtop sdk获取到会话id等信息。某开发同学在实现时,因ThreadLocal使用不当造成下述问题:

【问题1:权益领取失败分析】

在权益领取服务中,该应用构建了一套高效和线程安全的依赖注入框架,基于该框架的业务逻辑模块通常抽象为xxxMole形式,Mole间为网状依赖关系,框架会按依赖关系自动调用init方法(其中,被依赖的mole 的init方法先执行)。

在应用中,权益领取接口的主入口为CommonXXApplyMole类,CommonXXApplyMole依赖XXSessionMole。当请求来临时,会按依赖关系依次调用init方法,因此XXSessionMole的init方法会优先执行;而开发同学在CommonXXApplyMole类中的init方法中通过调用recoverMtopContext()方法来期望恢复mtop上下文,因recoverMtopContext()方法的调用时机过晚,从而导致XXSessionMole模块获取不到正确的会话id等信息而导致权益领取失败。

【问题2:脏数据分析】

权益领取服务在处理请求时,若当前线程曾经处理过权益领取请求,因ThreadLocal变量值未被清理,此时XXSessionMole通过mtop SDK获取会话信息时得到的是前一次请求的会话信息,从而造成脏数据。

【解决方案】

在依赖注入框架入口处AbstractGate#visit(或在XXSessionMole中)通过recoverMtopContext方法注入mtop上下文信息,并在入口方法的finally代码块清理当前请求的threadlocal变量值。

若使用强引用类型,则threadlocal的引用链为:Thread -> ThreadLocal.ThreadLocalMap -> Entry[] -> Entry -> key(threadLocal对象)和value;在这种场景下,只要这个线程还在运行(如线程池场景),若不调用remove方法,则该对象及关联的所有强引用对象都不会被垃圾回收器回收。

若使用static关键字进行修饰,则一个线程仅对应一个线程变量;否则,threadlocal语义变为perThread-perInstance,容易引发内存泄漏,如下述示例:

在上述main方法第22行debug,可见线程的threadLocals变量中有3个threadlocal实例。在工程实践中,使用threadlocal时通常期望一个线程只有一个threadlocal实例,因此,若不使用static修饰,期望的语义发生了变化,同时易引起内存泄漏。

如果不执行清理操作,则可能会出现:

建议使用try...finally 进行清理。

我们在使用ThreadLocal时,通常期望的语义是perThread,若不使用static进行修饰,则语义变为perThread-perInstance;在线程池场景下,若不用static进行修饰,创建的线程相关实例可能会达到 M * N个(其中M为线程数,N为对应类的实例数),易造成内存泄漏(https://errorprone.info/bugpattern/ThreadLocalUsage)。

在应用中,谨慎使用ThreadLocal.withInitial(Supplier<? extends S> supplier)这个工厂方法创建ThreadLocal对象,一旦不同线程的ThreadLocal使用了同一个Supplier对象,那么隔离也就无从谈起了,如:

总结

在java工程实践中,线程池和线程变量被广泛使用,因线程池和线程变量的不当使用经常造成安全生产事故,因此,正确使用线程池和线程变量是每一位开发人员必须修炼的基本功。本文从线程池和线程变量的使用出发,简要介绍了线程池和线程变量的原理和使用实践,各开发人员可结合最佳实践和实际应用场景,正确地使用线程和线程变量,构建出稳定、高效的java应用服务。

Ⅱ 源码修炼笔记之Dubbo线程池策略

FixedThreadPool

FixThreadPool内部是通过ThreadPoolExecutor来创建线程,核心线程数和最大线程数都是上下文中指定的线程数量threads,因为不存在空闲线程所以keepAliveTime为0,
当queues=0,创建SynchronousQueue阻塞队列;
当queues<0,创建无界的阻塞队列LinkedBlockingQueue;
当queues>0,创建有界的阻塞队列LinkedBlockingQueue。
采用bbo自己实现的线程工厂NamedInternalThreadFactory,将线程置为守护线程(Demon)
拒绝策略为AbortPolicyWithReport,策略为将调用时的堆栈信息保存到本地文件中,并抛出异常RejectedExecutionException

CachedThreadPool

CachedThreadPool与FixedThreadPool的区别是核心线程数和最大线程数不相等,通过alive来控制空闲线程的释放

LimitedThreadPool

LimitedThreadPool与CachedThreadPool的区别是空闲线程的超时时间为Long.MAX_VALUE,相当于线程数量不会动态变化了,创建的线程不会被释放。

EagerThreadPool

与上述三种线程池不同,EagerThreadPool并非通过JUC中的ThreadPoolExecutor来创建线程池,而是通过EagerThreadPoolExecutor来创建线程池,EagerThreadPoolExecutor继承自ThreadPoolExecutor,实现自定义的execute方法,采用的阻塞队列是TaskQueue,TaskQueue继承自LinkedBlockingQueue。

execute方法首先调用ThreadPoolExecutor的execute方法,如果执行失败会重新放入TaskQueue进行重试。

实现自定义的ThreadPool

ThreadPool被定义为一个扩展点,如下所示,

其默认实现是FixedThreadPool,可以通过实现该扩展来实现自定义的线程池策略。

Ⅲ Android线程池ThreadPoolExecutor详解

       传统的多线程是通过继承Thread类及实现Runnable接口来实现的,每次创建及销毁线程都会消耗资源、响应速度慢,且线程缺乏统一管理,容易出现阻塞的情况,针对以上缺点,线程池就出现了。

       线程池是一个创建使用线程并能保存使用过的线程以达到复用的对象,简单的说就是一块缓存了一定数量线程的区域。

       1.复用线程:线程执行完不会立刻退出,继续执行其他线程;
       2.管理线程:统一分配、管理、控制最大并发数;

       1.降低因频繁创建&销毁线程带来的性能开销,复用缓存在线程池中的线程;
       2.提高线程执行效率&响应速度,复用线程:响应速度;管理线程:优化线程执行顺序,避免大量线程抢占资源导致阻塞现象;
       3.提高对线程的管理度;

       线程池的使用也比较简单,流程如下:

       接下来通过源码来介绍一下ThreadPoolExecutor内部实现及工作原理。

       线程池的最终实现类是ThreadPoolExecutor,通过实现可以一步一步的看到,父接口为Executor:

       其他的继承及实现关系就不一一列举了,直接通过以下图来看一下:

       从构造方法开始看:

       通过以上可以看到,在创建ThreadPoolExecutor时,对传入的参数是有要求的:corePoolSize不能小于0;maximumPoolSize需要大于0,且需要大于等于corePoolSize;keepAliveTime大于0;workQueue、threadFactory都不能为null。
       在创建完后就需要执行Runnable了,看以下execute()方法:

       在execute()内部主要执行的逻辑如下:
       分析点1:如果当前线程数未超过核心线程数,则将runnable作为参数执行addWorker(),true表示核心线程,false表示非核心线程;
       分析点2:核心线程满了,如果线程池处于运行状态则往workQueue队列中添加任务,接下来判断是否需要拒绝或者执行addWorker();
       分析点3:以上都不满足时 [corePoolSize=0且没有运行的线程,或workQueue已经满了] ,执行addWorker()添加runnable,失败则执行拒绝策略;
        总结一下:线程池对线程创建的管理,流程图如下:

       在执行addWorker时,主要做了以下两件事:
       分析点1:将runnable作为参数创建Worker对象w,然后获取w内部的变量thread;
       分析点2:调用start()来启动thread;
       在addWorker()内部会将runnable作为参数传给Worker,然后从Worker内部读取变量thread,看一下Worker类的实现:

       Worker实现了Runnable接口,在Worker内部,进行了赋值及创建操作,先将execute()时传入的runnable赋值给内部变量firstTask,然后通过ThreadFactory.newThread(this)创建Thread,上面讲到在addWorker内部执行t.start()后,会执行到Worker内部的run()方法,接着会执行runWorker(this),一起看一下:

       前面可以看到,runWorker是执行在子线程内部,主要执行了三件事:
       分析1:获取当前线程,当执行shutdown()时需要将线程interrupt(),接下来从Worker内部取到firstTask,即execute传入的runnable,接下来会执行;
       分析2:while循环,task不空直接执行;否则执行getTask()去获取,不为空直接执行;
       分析3:对有效的task执行run(),由于是在子线程中执行,因此直接run()即可,不需要start();
       前面看到,在while内部有执行getTask(),一起看一下:

       getTask()是从workQueue内部获取接下来需要执行的runnable,内部主要做了两件事:
       分析1:先获取到当前正在执行工作的线程数量wc,通过判断allowCoreThreadTimeOut[在创建ThreadPoolExecutor时可以进行设置]及wc > corePoolSize来确定timed值;
       分析2:通过timed值来决定执行poll()或者take(),如果WorkQueue中有未执行的线程时,两者作用是相同的,立刻返回线程;如果WorkQueue中没有线程时,poll()有超时返回,take()会一直阻塞;如果allowCoreThreadTimeOut为true,则核心线程在超时时间没有使用的话,是需要退出的;wc > corePoolSize时,非核心线程在超时时间没有使用的话,是需要退出的;
       allowCoreThreadTimeOut是可以通过以下方式进行设置的:

       如果没有进行设置,那么corePoolSize数量的核心线程会一直存在。
        总结一下:ThreadPoolExecutor内部的核心线程如何确保一直存在,不退出?
       上面分析已经回答了这个问题,每个线程在执行时会执行runWorker(),而在runWorker()内部有while()循环会判断getTask(),在getTask()内部会对当前执行的线程数量及allowCoreThreadTimeOut进行实时判断,如果工作数量大于corePoolSize且workQueue中没有未执行的线程时,会执行poll()超时退出;如果工作数量不大于corePoolSize且workQueue中没有未执行的线程时,会执行take()进行阻塞,确保有corePoolSize数量的线程阻塞在runWorker()内部的while()循环不退出。
       如果需要关闭线程池,需要如何操作呢,看一下shutdown()方法:

       以上可以看到,关闭线程池的原理:a. 遍历线程池中的所有工作线程;b. 逐个调用线程的interrupt()中断线程(注:无法响应中断的任务可能永远无法终止)
       也可调用shutdownNow()来关闭线程池,二者区别:
       shutdown():设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程;
       shutdownNow():设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;
       使用建议:一般调用shutdown()关闭线程池;若任务不一定要执行完,则调用shutdownNow();
        总结一下:ThreadPoolExecutor在执行execute()及shutdown()时的调用关系,流程图如下:

       线程池可以通过Executors来进行不同类型的创建,具体分为四种不同的类型,如下:

       可缓存线程池:不固定线程数量,且支持最大为Integer.MAX_VALUE的线程数量:

       1、线程数无限制
       2、有空闲线程则复用空闲线程,若无空闲线程则新建线程
       3、一定程度上减少频繁创建/销毁线程,减少系统开销

       固定线程数量的线程池:定长线程池

       1、可控制线程最大并发数(同时执行的线程数)
       2、超出的线程会在队列中等待。

       单线程化的线程池:可以理解为线程数量为1的FixedThreadPool

       1、有且仅有一个工作线程执行任务
       2、所有任务按照指定顺序执行,即遵循队列的入队出队规则

       定时以指定周期循环执行任务

       一般来说,等待队列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 与 SynchronousQueue 。
       假设向线程池提交任务时,核心线程都被占用的情况下:
        ArrayBlockingQueue :基于数组的阻塞队列,初始化需要指定固定大小。
       当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了之后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。所以最终可能出现后提交的任务先执行,而先提交的任务一直在等待。
        LinkedBlockingQueue :基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定。
       当指定大小后,行为就和 ArrayBlockingQueue一致。而如果未指定大小,则会使用默认的 Integer.MAX_VALUE 作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功。最终意味着所有任务都是在核心线程执行。如果核心线程一直被占,那就一直等待。
        SynchronousQueue :无容量的队列。
       使用此队列意味着希望获得最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败。而失败后如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Integer.MAX_VALUE就实现了真正的无等待。
       但是需要注意的是, 进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个。

Ⅳ 并发编程解惑之线程

主要内容:

进程是资源分配的最小单位,每个进程都有独立的代码和数据空间,一个进程包含 1 到 n 个线程。线程是 CPU 调度的最小单位,每个线程有独立的运行栈和程序计数器,线程切换开销小。

Java 程序总是从主类的 main 方法开始执行,main 方法就是 Java 程序默认的主线程,而在 main 方法中再创建的线程就是其他线程。在 Java 中,每次程序启动至少启动 2 个线程。一个是 main 线程,一个是垃圾收集线程。每次使用 Java 命令启动一个 Java 程序,就相当于启动一个 JVM 实例,而每个 JVM 实例就是在操作系统中启动的一个进程。

多线程可以通过继承或实现接口的方式创建。

Thread 类是 JDK 中定义的用于控制线程对象的类,该类中封装了线程执行体 run() 方法。需要强调的一点是,线程执行先后与创建顺序无关。

通过 Runnable 方式创建线程相比通过继承 Thread 类创建线程的优势是避免了单继承的局限性。若一个 boy 类继承了 person 类,boy 类就无法通过继承 Thread 类的方式来实现多线程。

使用 Runnable 接口创建线程的过程:先是创建对象实例 MyRunnable,然后将对象 My Runnable 作为 Thread 构造方法的入参,来构造出线程。对于 new Thread(Runnable target) 创建的使用同一入参目标对象的线程,可以共享该入参目标对象 MyRunnable 的成员变量和方法,但 run() 方法中的局部变量相互独立,互不干扰。

上面代码是 new 了三个不同的 My Runnable 对象,如果只想使用同一个对象,可以只 new 一个 MyRunnable 对象给三个 new Thread 使用。

实现 Runnable 接口比继承 Thread 类所具有的优势:

线程有新建、可运行、阻塞、等待、定时等待、死亡 6 种状态。一个具有生命的线程,总是处于这 6 种状态之一。 每个线程可以独立于其他线程运行,也可和其他线程协同运行。线程被创建后,调用 start() 方法启动线程,该线程便从新建态进入就绪状态。

NEW 状态(新建状态) 实例化一个线程之后,并且这个线程没有开始执行,这个时候的状态就是 NEW 状态:

RUNNABLE 状态(就绪状态):

阻塞状态有 3 种:

如果一个线程调用了一个对象的 wait 方法, 那么这个线程就会处于等待状态(waiting 状态)直到另外一个线程调用这个对象的 notify 或者 notifyAll 方法后才会解除这个状态。

run() 里的代码执行完毕后,线程进入终结状态(TERMINATED 状态)。

线程状态有 6 种:新建、可运行、阻塞、等待、定时等待、死亡。

我们看下 join 方法的使用:

运行结果:

我们来看下 yield 方法的使用:

运行结果:

线程与线程之间是无法直接通信的,A 线程无法直接通知 B 线程,Java 中线程之间交换信息是通过共享的内存来实现的,控制共享资源的读写的访问,使得多个线程轮流执行对共享数据的操作,线程之间通信是通过对共享资源上锁或释放锁来实现的。线程排队轮流执行共享资源,这称为线程的同步。

Java 提供了很多同步操作(也就是线程间的通信方式),同步可使用 synchronized 关键字、Object 类的 wait/notifyAll 方法、ReentrantLock 锁、无锁同步 CAS 等方式来实现。

ReentrantLock 是 JDK 内置的一个锁对象,用于线程同步(线程通信),需要用户手动释放锁。

运行结果:

这表明同一时间段只能有 1 个线程执行 work 方法,因为 work 方法里的代码需要获取到锁才能执行,这就实现了多个线程间的通信,线程 0 获取锁,先执行,线程 1 等待,线程 0 释放锁,线程 1 继续执行。

synchronized 是一种语法级别的同步方式,称为内置锁。该锁会在代码执行完毕后由 JVM 释放。

输出结果跟 ReentrantLock 一样。

Java 中的 Object 类默认是所有类的父类,该类拥有 wait、 notify、notifyAll 方法,其他对象会自动继承 Object 类,可调用 Object 类的这些方法实现线程间的通信。

除了可以通过锁的方式来实现通信,还可通过无锁的方式来实现,无锁同 CAS(Compare-and-Swap,比较和交换)的实现,需要有 3 个操作数:内存地址 V,旧的预期值 A,即将要更新的目标值 B,当且仅当内存地址 V 的值与预期值 A 相等时,将内存地址 V 的值修改为目标值 B,否则就什么都不做。

我们通过计算器的案例来演示无锁同步 CAS 的实现方式,非线程安全的计数方式如下:

线程安全的计数方式如下:

运行结果:

线程安全累加的结果才是正确的,非线程安全会出现少计算值的情况。JDK 1.5 开始,并发包里提供了原子操作的类,AtomicBoolean 用原子方式更新的 boolean 值,AtomicInteger 用原子方式更新 int 值,AtomicLong 用原子方式更新 long 值。 AtomicInteger 和 AtomicLong 还提供了用原子方式将当前值自增 1 或自减 1 的方法,在多线程程序中,诸如 ++i 或 i++ 等运算不具有原子性,是不安全的线程操作之一。 通常我们使用 synchronized 将该操作变成一个原子操作,但 JVM 为此种操作提供了原子操作的同步类 Atomic,使用 AtomicInteger 做自增运算的性能是 ReentantLock 的好几倍。

上面我们都是使用底层的方式实现线程间的通信的,但在实际的开发中,我们应该尽量远离底层结构,使用封装好的 API,例如 J.U.C 包(java.util.concurrent,又称并发包)下的工具类 CountDownLath、CyclicBarrier、Semaphore,来实现线程通信,协调线程执行。

CountDownLatch 能够实现线程之间的等待,CountDownLatch 用于某一个线程等待若干个其他线程执行完任务之后,它才开始执行。

CountDownLatch 类只提供了一个构造器:

CountDownLatch 类中常用的 3 个方法:

运行结果:

CyclicBarrier 字面意思循环栅栏,通过它可以让一组线程等待至某个状态之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier 可以被重复使用,所以有循环之意。

相比 CountDownLatch,CyclicBarrier 可以被循环使用,而且如果遇到线程中断等情况时,可以利用 reset() 方法,重置计数器,CyclicBarrier 会比 CountDownLatch 更加灵活。

CyclicBarrier 提供 2 个构造器:

上面的方法中,参数 parties 指让多少个线程或者任务等待至 barrier 状态;参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容。

CyclicBarrier 中最重要的方法 await 方法,它有 2 个重载版本。下面方法用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任务。

而下面的方法则是让这些线程等待至一定的时间,如果还有线程没有到达 barrier 状态就直接让到达 barrier 的线程执行任务。

运行结果:

CyclicBarrier 用于一组线程互相等待至某个状态,然后这一组线程再同时执行,CountDownLatch 是不能重用的,而 CyclicBarrier 可以重用。

Semaphore 类是一个计数信号量,它可以设定一个阈值,多个线程竞争获取许可信号,执行完任务后归还,超过阈值后,线程申请许可信号时将会被阻塞。Semaphore 可以用来 构建对象池,资源池,比如数据库连接池。

假如在服务器上运行着若干个客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻只能获得一定数目的数据库连接。要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程呢?

给方法加同步锁,保证同一时刻只能有一个线程去调用此方法,其他所有线程排队等待,但若有 10 个数据库连接,也只有一个能被使用,效率太低。另外一种方法,使用信号量,让信号量许可与数据库可用连接数为相同数量,10 个数据库连接都能被使用,大大提高性能。

上面三个工具类是 J.U.C 包的核心类,J.U.C 包的全景图就比较复杂了:

J.U.C 包(java.util.concurrent)中的高层类(Lock、同步器、阻塞队列、Executor、并发容器)依赖基础类(AQS、非阻塞数据结构、原子变量类),而基础类是通过 CAS 和 volatile 来实现的。我们尽量使用顶层的类,避免使用基础类 CAS 和 volatile 来协调线程的执行。J.U.C 包其他的内容,在其他的篇章会有相应的讲解。

Future 是一种异步执行的设计模式,类似 ajax 异步请求,不需要同步等待返回结果,可继续执行代码。使 Runnable(无返回值不支持上报异常)或 Callable(有返回值支持上报异常)均可开启线程执行任务。但是如果需要异步获取线程的返回结果,就需要通过 Future 来实现了。

Future 是位于 java.util.concurrent 包下的一个接口,Future 接口封装了取消任务,获取任务结果的方法。

在 Java 中,一般是通过继承 Thread 类或者实现 Runnable 接口来创建多线程, Runnable 接口不能返回结果,JDK 1.5 之后,Java 提供了 Callable 接口来封装子任务,Callable 接口可以获取返回结果。我们使用线程池提交 Callable 接口任务,将返回 Future 接口添加进 ArrayList 数组,最后遍历 FutureList,实现异步获取返回值。

运行结果:

上面就是异步线程执行的调用过程,实际开发中用得更多的是使用现成的异步框架来实现异步编程,如 RxJava,有兴趣的可以继续去了解,通常异步框架都是结合远程 HTTP 调用 Retrofit 框架来使用的,两者结合起来用,可以避免调用远程接口时,花费过多的时间在等待接口返回上。

线程封闭是通过本地线程 ThreadLocal 来实现的,ThreadLocal 是线程局部变量(local vari able),它为每个线程都提供一个变量值的副本,每个线程对该变量副本的修改相互不影响。

在 JVM 虚拟机中,堆内存用于存储共享的数据(实例对象),也就是主内存。Thread Local .set()、ThreadLocal.get() 方法直接在本地内存(工作内存)中写和读共享变量的副本,而不需要同步数据,不用像 synchronized 那样保证数据可见性,修改主内存数据后还要同步更新到工作内存。

Myabatis、hibernate 是通过 threadlocal 来存储 session 的,每一个线程都维护着一个 session,对线程独享的资源操作很方便,也避免了线程阻塞。

ThreadLocal 类位于 Thread 线程类内部,我们分析下它的源码:

ThreadLocal 和 Synchonized 都用于解决多线程并发访问的问题,访问多线程共享的资源时,Synchronized 同步机制采用了以时间换空间的方式,提供一份变量让多个线程排队访问,而 ThreadLocal 采用了以空间换时间的方式,提供每个线程一个变量,实现数据隔离。

ThreadLocal 可用于数据库连接 Connection 对象的隔离,使得每个请求线程都可以复用连接而又相互不影响。

在 Java 里面,存在强引用、弱引用、软引用、虚引用。我们主要来了解下强引用和弱引用:

上面 a、b 对实例 A、B 都是强引用

而上面这种情况就不一样了,即使 b 被置为 null,但是 c 仍然持有对 C 对象实例的引用,而间接的保持着对 b 的强引用,所以 GC 不会回收分配给 b 的空间,导致 b 无法回收也没有被使用,造成了内存泄漏。这时可以通过 c = null; 来使得 c 被回收,但也可以通过弱引用来达到同样目的:

从源码中可以看出 Entry 里的 key 对 ThreadLocal 实例是弱引用:

Entry 里的 key 对 ThreadLocal 实例是弱引用,将 key 值置为 null,堆中的 ThreadLocal 实例是可以被垃圾收集器(GC)回收的。但是 value 却存在一条从 Current Thread 过来的强引用链,只有当当前线程 Current Thread 销毁时,value 才能被回收。在 threadLocal 被设为 null 以及线程结束之前,Entry 的键值对都不会被回收,出现内存泄漏。为了避免泄漏,在 ThreadLocalMap 中的 set/get Entry 方法里,会对 key 为 null 的情况进行判断,如果为 null 的话,就会对 value 置为 null。也可以通过 ThreadLocal 的 remove 方法(类似加锁和解锁,最后 remove 一下,解锁对象的引用)直接清除,释放内存空间。

总结来说,利用 ThreadLocal 来访问共享数据时,JVM 通过设置 ThreadLocalMap 的 Key 为弱引用,来避免内存泄露,同时通过调用 remove、get、set 方法的时候,回收弱引用(Key 为 null 的 Entry)。当使用 static ThreadLocal 的时候(如上面的 Spring 多数据源),static 变量在类未加载的时候,它就已经加载,当线程结束的时候,static 变量不一定会被回收,比起普通成员变量使用的时候才加载,static 的生命周期变长了,若没有及时回收,容易产生内存泄漏。

使用线程池,可以重用存在的线程,减少对象创建、消亡的开销,可控制最大并发线程数,避免资源竞争过度,还能实现线程定时执行、单线程执行、固定线程数执行等功能。

Java 把线程的调用封装成了一个 Executor 接口,Executor 接口中定义了一个 execute 方法,用来提交线程的执行。Executor 接口的子接口是 ExecutorService,负责管理线程的执行。通过 Executors 类的静态方法可以初始化

ExecutorService 线程池。Executors 类的静态方法可创建不同类型的线程池:

但是,不建议使用 Executors 去创建线程池,而是通过 ThreadPoolExecutor 的方式,明确给出线程池的参数去创建,规避资源耗尽的风险。

如果使用 Executors 去创建线程池:

最佳的实践是通过 ThreadPoolExecutor 手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。通过线程池实现类 ThreadPoolExecutor 可构造出线程池的,构造函数有下面几个重要的参数:

参数 1:corePoolSize

线程池核心线程数。

参数 2:workQueue

阻塞队列,用于保存执行任务的线程,有 4 种阻塞队列可选:

参数 3:maximunPoolSize

线程池最大线程数。如果阻塞队列满了(有界的阻塞队列),来了一个新的任务,若线程池当前线程数小于最大线程数,则创建新的线程执行任务,否则交给饱和策略处理。如果是无界队列就不存在这种情况,任务都在无界队列里存储着。

参数 4:RejectedExecutionHandler

拒绝策略,当队列满了,而且线程达到了最大线程数后,对新任务采取的处理策略。

有 4 种策略可选:

最后,还可以自定义处理策略。

参数 5:ThreadFactory

创建线程的工厂。

参数 6:keeyAliveTime

线程没有任务执行时最多保持多久时间终止。当线程池中的线程数大于 corePoolSize 时,线程池中所有线程中的某一个线程的空闲时间若达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但如果调用了 allowCoreThread TimeOut(boolean value) 方法,线程池中的线程数就算不超过 corePoolSize,keepAlive Time 参数也会起作用,直到线程池中的线程数量变为 0。

参数 7:TimeUnit

配合第 6 个参数使用,表示存活时间的时间单位最佳的实践是通过 ThreadPoolExecutor 手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。

运行结果:

线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会不断的去获取队列里的任务来执行。Worker 的加锁解锁机制是继承 AQS 实现的。

我们来看下 Worker 线程的运行过程:

总结来说,如果当前运行的线程数小于 corePoolSize 线程数,则获取全局锁,然后创建新的线程来执行任务如果运行的线程数大于等于 corePoolSize 线程数,则将任务加入阻塞队列 BlockingQueue 如果阻塞队列已满,无法将任务加入 BlockingQueue,则获取全局所,再创建新的线程来执行任务

如果新创建线程后使得线程数超过了 maximumPoolSize 线程数,则调用 Rejected ExecutionHandler.rejectedExecution() 方法根据对应的拒绝策略处理任务。

CPU 密集型任务,线程执行任务占用 CPU 时间会比较长,应该配置相对少的线程数,避免过度争抢资源,可配置 N 个 CPU+1 个线程的线程池;但 IO 密集型任务则由于需要等待 IO 操作,线程经常处于等待状态,应该配置相对多的线程如 2*N 个 CPU 个线程,A 线程阻塞后,B 线程能马上执行,线程多竞争激烈,能饱和的执行任务。线程提交 SQL 后等待数据库返回结果时间较长的情况,CPU 空闲会较多,线程数应设置大些,让更多线程争取 CPU 的调度。

Ⅳ 【Java基础】线程池的原理是什么

什么是线程池?

总归为:池化技术 ---》数据库连接池 缓存架构 缓存池 线程池 内存池,连接池,这种思想演变成缓存架构技术---> JDK设计思想有千丝万缕的联系

首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。

Java 中的 ThreadPoolExecutor 类

java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类,因此如果要透彻地了解Java 中的线程池,必须先了解这个类。下面我们来看一下 ThreadPoolExecutor 类的具体实现源码。

在 ThreadPoolExecutor 类中提供了四个构造方法:

阅读全文

与线程池源码分析相关的资料

热点内容
linux查看文件权限命令 浏览:685
安卓手游存档怎么用 浏览:761
linuxyum安装ftp 浏览:690
村委会主任可以推行政命令吗 浏览:102
电脑文件夹封面多张图片 浏览:263
网吧总服务器叫什么 浏览:922
多个算法解决同一个问题 浏览:455
小车解压后我的购车发票呢 浏览:977
做app开发用什么云服务器 浏览:177
linux网卡子接口 浏览:985
21岁职高毕业学程序员怎么学 浏览:321
vs如何对单个文件编译 浏览:6
为什么有的电脑不能安装python 浏览:75
金蝶迷你版加密狗检测到过期 浏览:186
硬件描述语言编译结果 浏览:655
程序员逆天改命 浏览:19
金斗云服务器 浏览:447
港口工程pdf 浏览:770
程序设计语言pdf 浏览:434
蔬菜价格上涨算法 浏览:221