导图社区 JUC思维导图
JUC思维导图,包括Lock框架、Collections、Tools类、Atomic原子类、CompletableFuture、CompletionService等等。
编辑于2022-07-11 14:18:53JUC
Lock框架

类结构总览
管程
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题 再造管程的理由? synchronized不能破坏不可抢占条件 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
接口: Condition
Condition为接口类型,它将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。可以通过await(),signal()来休眠/唤醒线程。
接口: Lock
Lock为接口类型,Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition对象。
接口: ReadWriteLock
适用于读多写少场景 读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则: 1.允许多个线程同时读共享变量 2.只允许一个线程写共享变量 3.如果一个写线程正在执行写操作,此时禁止读线程读共享变量
抽象类: AbstractOwnableSynchonizer
抽象类(long): AbstractQueuedLongSynchronizer
核心抽象类(int): (AQS) AbstractQueuedSynchonizer
AbstractQueuedSynchronizer简称AQS,是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。
带着问题理解
AbstractQueuedSynchronizer简介
AQS核心思想
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。 (CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。) AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。 状态信息通过procted类型的getState,setState,compareAndSetState进行操作
AQS对资源的共享方式
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。
Exclusive(独占)
只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
公平锁
按照线程在队列中的排队顺序,先到者先拿到锁
非公平锁
当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
Share(共享)
多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到
AQS底层使用了模板方法模式
AbstractQueuedSynchronizer数据结构
Exclusive(独占)
AbstractQueuedSynchronizer源码分析
AbstractQueuedSynchronizer示例详解一
AbstractQueuedSynchronizer示例详解二
AbstractQueuedSynchronizer总结
锁常用类: LockSupport
带着问题理解LockSupport
LockSupport简介
LockSupport源码分析
类的属性
类的构造函数
核心函数分析
park函数
parkNanos函数
parkUntil函数
unpark函数
LockSupport示例说明
使用wait/notify实现线程同步
使用park/unpark实现线程同步
中断响应
更深入的理解
Thread.sleep()和Object.wait()的区别
Thread.sleep()和Condition.await()的区别
Thread.sleep()和LockSupport.park()的区别
Object.wait()和LockSupport.park()的区别
LockSupport.park()会释放锁资源吗?
锁常用类: ReentrantLock
ReenTrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态是我们去分析和理解锁设计的关键钥匙。
带着问题理解ReentrantLock
Java中synchronized和ReentrantLock有什么不同?
ReentrantLock源码分析
示例分析
锁常用类: ReentrantReadWriteLock
带着问题理解ReentrantReadWriteLock
ReentrantReadWriteLock数据结构
ReentrantReadWriteLock源码分析
ReentrantReadWriteLock示例
锁常用类: StampedLock
ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。 而 StampedLock 支持三种模式,分别是:写锁、悲观读锁和乐观读。
Collections
类结构关系
Queue: ArrayBlockingQueue
Queue: LinkedBlockingQueue
Queue: LinkedBlockingDeque
Queue: ConcurrentLinkedQueue
带着问题理解
ConcurrentLinkedQueue数据结构
ConcurrentLinkedQueue源码分析
ConcurrentLinkedQueue示例
再深入理解
Queue: ConcurrentLinkedDeque
Queue: DelayQueue
Queue: PriorityBlockingQueue
Queue: SynchronousQueue
Queue: LinkedTransferQueue
List: CopyOnWriteArrayList
带着问题理解
CopyOnWriteArrayList源码分析
CopyOnWriteArrayList示例
更深入理解
Set: CopyOnWriteArraySet
Map: ConcurrentHashMap
带着问题理解
为什么HashTable慢
ConcurrentHashMap - JDK 1.7
ConcurrentHashMap - JDK 1.8
对比总结
Set: ConcurrentSkipListSet
Map: ConcurrentSkipListMap
Tools类
工具常用类: CountDownLatch
允许一个或多个线程等待其他线程完成操作
带着问题理解
CountDownLatch介绍
CountDownLatch源码分析
CountDownLatch示例
更深入理解
工具常用类: CyclicBarrier
CyclicBarrier 是一组线程之间互相等待 CountDownLatch和CyclicBarrier都有让多个线程等待同步然后再开始下一步动作的意思,但是CountDownLatch的下一步的动作实施者是主线程,具有不可重复性;而CyclicBarrier的下一步动作实施者还是“其他线程”本身,具有往复多次实施动作的特点
带着问题理解
CyclicBarrier简介
CyclicBarrier源码分析
CyclicBarrier示例
和CountDonwLatch再对比
工具常用类: Phaser
带着问题理解
Phaser运行机制
Phaser源码详解
工具常用类: Semaphore
Semaphore就是一个信号量,它的作用是限制某段代码块的并发数。 Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问, 如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。 由此可以看出如果Semaphore构造函数中传入的int型整数n=1,相当于变成了一个synchronized了。
带着问题理解
Semaphore源码分析
Semaphore示例
更深入理解
使用场景
使用场景: 单机版的限流器
工具常用类: Exchanger
带着问题理解
Exchanger简介
Exchanger实现机制
Exchanger源码解析
Atomic原子类
CAS
带着问题来理解
什么是CAS
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试 CAS是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,经过调查发现,其实现方式是基于硬件平台的汇编指令,就是说CAS是靠硬件实现的,JVM只是封装了汇编调用,那些AtomicInteger类便是使用了这些封装后的接口。 简单解释:CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先比较下在旧值有没有发生变化,如果没有发生变化,才交换成新值,发生了变化则不交换。 CAS操作是原子性的,所以多线程并发使用CAS更新数据时,可以不使用锁。JDK中大量使用了CAS来更新数据而防止加锁(synchronized 重量级锁)来保持原子更新。
CAS使用示例
如果不使用CAS,在高并发下,多线程同时修改一个变量的值我们需要synchronized加锁(可能有人说可以用Lock加锁,Lock底层的AQS也是基于CAS进行获取锁的)。 public class Test { private int i=0; public synchronized int add(){ return i++; } } java中为我们提供了AtomicInteger 原子类(底层基于CAS进行更新数据的),不需要加锁就在多线程并发场景下实现数据的一致性 public class Test { private AtomicInteger i = new AtomicInteger(0); public int add(){ return i.addAndGet(1); } }
CAS问题
CAS 方式为乐观锁,synchronized 为悲观锁。因此使用 CAS 解决并发问题通常情况下性能更优。 但是会有几个问题
ABA问题
因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。 ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。 从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令,那么效率会有一定的提升。 pause指令有两个作用:第一,它可以延迟流水线执行命令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零; 第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起CPU流水线被清空(CPU Pipeline Flush),从而提高CPU的执行效率。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。 还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。 从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
优缺点
使用CAS在线程冲突严重时,会大幅降低程序性能;CAS只适合于线程冲突较少的情况使用。 synchronized在jdk1.6之后,已经改进优化。synchronized的底层实现主要依靠Lock-Free的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和CAS类似的性能;而线程冲突严重的情况下,性能远高于CAS
UnSafe类详解
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源、自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。 这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。
UnSafe与CAS
UnSafe底层
UnSafe其他功能
AtomicInteger
使用举例
源码解析
延申到所有原子类:13个
原子更新基本类型
原子更新数组
原子更新引用类型
原子更新字段类
再讲讲AtomicStampedReference 解决CAS的ABA问题
AtomicStampedReference
使用举例
java中还有哪些类可以解决ABA问题
CompletableFuture
 对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决; 1.无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注; 2.语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”; 3.代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
一、创建异步任务
runAsync
简单的异步执行一个线程,但是它将返回一个CompletableFuture,有了这个CompletableFuture,可以重新组装和调配,这是和一个普通Runnable不同之处。
supplyAsync
它表⽰这个异步任务有返回值
二、异步回调
thenApply
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中
thenApplyAsync
区别
thenApply:当前任务的线程继续执行“thenApply”的任务。 thenApplyAsync:把“thenApplyAsync”这个任务继续交给线程池来进行执行。
thenAccept
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值
thenRun
thenRun 的方法没有入参,也买有返回值
exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果
whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常
handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了
三、组合处理
thenCombine / thenAcceptBoth / runAfterBoth
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务
allOf / anyOf
allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。  这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。 UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。 BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
设计思想
CompletionService
而批量的并行任务,则可以通过 CompletionService 来解决。 当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单
ThreadLocal详解
带着BAT大厂的面试问题去理解
什么是ThreadLocal? 用来解决什么问题的? 说说你对ThreadLocal的理解 ThreadLocal是如何实现线程隔离的? 为什么ThreadLocal会造成内存泄露? 如何解决 还有哪些使用ThreadLocal的应用场景?
ThreadLocal简介
我们在Java 并发 - 并发理论基础总结过线程安全(是指广义上的共享资源访问安全性,因为线程隔离是通过副本保证本线程访问资源安全性,它不保证线程之间还存在共享关系的狭义上的安全性)的解决思路: 互斥同步: synchronized 和 ReentrantLock 非阻塞同步: CAS, AtomicXXXX 无同步方案: 栈封闭,本地存储(Thread Local),可重入代码 ThreadLocal是一个将在多线程中为每一个线程创建单独的变量副本的类; 当使用ThreadLocal来维护变量时, ThreadLocal会为每个线程创建单独的变量副本, 避免因多线程操作共享变量而导致的数据不一致的情况。
ThreadLocal理解
提到ThreadLocal被提到应用最多的是session管理和数据库链接管理,这里以数据访问为例帮助你理解ThreadLocal: 如下数据库管理类在单线程使用是没有任何问题的 class ConnectionManager { private static Connection connect = null; public static Connection openConnection() { if (connect == null) { connect = DriverManager.getConnection(); } return connect; } public static void closeConnection() { if (connect != null) connect.close(); } } 很显然,在多线程中使用会存在线程安全问题: 第一,这里面的2个方法都没有进行同步,很可能在openConnection方法中会多次创建connect; 第二,由于connect是共享变量,那么必然在调用connect的地方需要使用到同步来保障线程安全,因为很可能一个线程在使用connect进行数据库操作,而另外一个线程调用closeConnection关闭链接。 为了解决上述线程安全的问题,第一考虑:互斥同步 你可能会说,将这段代码的两个方法进行同步处理,并且在调用connect的地方需要进行同步处理,比如用Synchronized或者ReentrantLock互斥锁。 这里再抛出一个问题:这地方到底需不需要将connect变量进行共享? 事实上,是不需要的。假如每个线程中都有一个connect变量,各个线程之间对connect变量的访问实际上是没有依赖关系的,即一个线程不需要关心其他线程是否对这个connect进行了修改的。即改后的代码可以这样: class ConnectionManager { private Connection connect = null; public Connection openConnection() { if (connect == null) { connect = DriverManager.getConnection(); } return connect; } public void closeConnection() { if (connect != null) connect.close(); } } class Dao { public void insert() { ConnectionManager connectionManager = new ConnectionManager(); Connection connection = connectionManager.openConnection(); // 使用connection进行操作 connectionManager.closeConnection(); } } 这样处理确实也没有任何问题,由于每次都是在方法内部创建的连接,那么线程之间自然不存在线程安全问题。但是这样会有一个致命的影响:导致服务器压力非常大,并且严重影响程序执行性能。由于在方法中需要频繁地开启和关闭数据库连接,这样不仅严重影响程序执行效率,还可能导致服务器压力巨大。 这时候ThreadLocal登场了 那么这种情况下使用ThreadLocal是再适合不过的了,因为ThreadLocal在每个线程中对该变量会创建一个副本,即每个线程内部都会有一个该变量,且在线程内部任何地方都可以使用,线程之间互不影响,这样一来就不存在线程安全问题,也不会严重影响程序执行性能。下面就是网上出现最多的例子: import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class ConnectionManager { private static final ThreadLocal<Connection> dbConnectionLocal = new ThreadLocal<Connection>() { @Override protected Connection initialValue() { try { return DriverManager.getConnection("", "", ""); } catch (SQLException e) { e.printStackTrace(); } return null; } }; public Connection getConnection() { return dbConnectionLocal.get(); } } 再注意下ThreadLocal的修饰符 ThreaLocal的JDK文档中说明:ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread。如果我们希望通过某个类将状态(例如用户ID、事务ID)与线程关联起来,那么通常在这个类中定义private static类型的ThreadLocal 实例。 但是要注意,虽然ThreadLocal能够解决上面说的问题,但是由于在每个线程中都创建了副本,所以要考虑它对资源的消耗,比如内存的占用会比不使用ThreadLocal要大。
ThreadLocal原理
如何实现线程隔离
主要是用到了Thread对象中的一个ThreadLocalMap类型的变量threadLocals, 负责存储当前线程的关于Connection的对象, dbConnectionLocal(以上述例子中为例) 这个变量为Key, 以新建的Connection对象为Value; 这样的话, 线程第一次读取的时候如果不存在就会调用ThreadLocal的initialValue方法创建一个Connection对象并且返回; 具体关于为线程分配变量副本的代码如下: public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } 首先获取当前线程对象t, 然后从线程t中获取到ThreadLocalMap的成员属性threadLocals 如果当前线程的threadLocals已经初始化(即不为null) 并且存在以当前ThreadLocal对象为Key的值, 则直接返回当前线程要获取的对象(本例中为Connection); 如果当前线程的threadLocals已经初始化(即不为null)但是不存在以当前ThreadLocal对象为Key的的对象, 那么重新创建一个Connection对象, 并且添加到当前线程的threadLocals Map中,并返回 如果当前线程的threadLocals属性还没有被初始化, 则重新创建一个ThreadLocalMap对象, 并且创建一个Connection对象并添加到ThreadLocalMap对象中并返回。 如果存在则直接返回很好理解, 那么对于如何初始化的代码又是怎样的呢? private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } 首先调用我们上面写的重载过后的initialValue方法, 产生一个Connection对象 继续查看当前线程的threadLocals是不是空的, 如果ThreadLocalMap已被初始化, 那么直接将产生的对象添加到ThreadLocalMap中, 如果没有初始化, 则创建并添加对象到其中; 同时, ThreadLocal还提供了直接操作Thread对象中的threadLocals的方法 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } 这样我们也可以不实现initialValue, 将初始化工作放到DBConnectionFactory的getConnection方法中: public Connection getConnection() { Connection connection = dbConnectionLocal.get(); if (connection == null) { try { connection = DriverManager.getConnection("", "", ""); dbConnectionLocal.set(connection); } catch (SQLException e) { e.printStackTrace(); } } return connection; } 那么我们看过代码之后就很清晰的知道了为什么ThreadLocal能够实现变量的多线程隔离了; 其实就是用了Map的数据结构给当前线程缓存了, 要使用的时候就从本线程的threadLocals对象中获取就可以了, key就是当前线程; 当然了在当前线程下获取当前线程里面的Map里面的对象并操作肯定没有线程并发问题了, 当然能做到变量的线程间隔离了; 现在我们知道了ThreadLocal到底是什么了, 又知道了如何使用ThreadLocal以及其基本实现原理了是不是就可以结束了呢? 其实还有一个问题就是ThreadLocalMap是个什么对象, 为什么要用这个对象呢?
ThreadLocalMap对象是什么
本质上来讲, 它就是一个Map, 但是这个ThreadLocalMap与我们平时见到的Map有点不一样 它没有实现Map接口; 它没有public的方法, 最多有一个default的构造方法, 因为这个ThreadLocalMap的方法仅仅在ThreadLocal类中调用, 属于静态内部类 ThreadLocalMap的Entry实现继承了WeakReference<ThreadLocal<?>> 该方法仅仅用了一个Entry数组来存储Key, Value; Entry并不是链表形式, 而是每个bucket里面仅仅放一个Entry; 要了解ThreadLocalMap的实现, 我们先从入口开始, 就是往该Map中添加一个值: private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } 先进行简单的分析, 对该代码表层意思进行解读: 看下当前threadLocal的在数组中的索引位置 比如: i = 2, 看 i = 2 位置上面的元素(Entry)的Key是否等于threadLocal 这个 Key, 如果等于就很好说了, 直接将该位置上面的Entry的Value替换成最新的就可以了; 如果当前位置上面的 Entry 的 Key为空, 说明ThreadLocal对象已经被回收了, 那么就调用replaceStaleEntry 如果清理完无用条目(ThreadLocal被回收的条目)、并且数组中的数据大小 > 阈值的时候对当前的Table进行重新哈希 所以, 该HashMap是处理冲突检测的机制是向后移位, 清除过期条目 最终找到合适的位置; 了解完Set方法, 后面就是Get方法了: private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } 先找到ThreadLocal的索引位置, 如果索引位置处的entry不为空并且键与threadLocal是同一个对象, 则直接返回; 否则去后面的索引位置继续查找。
ThreadLocal造成内存泄露的问题
网上有这样一个例子: import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadLocalDemo { static class LocalVariable { private Long[] a = new Long[1024 * 1024]; } // (1) final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); // (2) final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>(); public static void main(String[] args) throws InterruptedException { // (3) Thread.sleep(5000 * 4); for (int i = 0; i < 50; ++i) { poolExecutor.execute(new Runnable() { public void run() { // (4) localVariable.set(new LocalVariable()); // (5) System.out.println("use local varaible" + localVariable.get()); localVariable.remove(); } }); } // (6) System.out.println("pool execute over"); } } 如果用线程池来操作ThreadLocal 对象确实会造成内存泄露, 因为对于线程池里面不会销毁的线程, 里面总会存在着<ThreadLocal, LocalVariable>的强引用, 因为final static 修饰的 ThreadLocal 并不会释放, 而ThreadLocalMap 对于 Key 虽然是弱引用, 但是强引用不会释放, 弱引用当然也会一直有值, 同时创建的LocalVariable对象也不会释放, 就造成了内存泄露; 如果LocalVariable对象不是一个大对象的话, 其实泄露的并不严重, 泄露的内存 = 核心线程数 * LocalVariable对象的大小; 所以, 为了避免出现内存泄露的情况, ThreadLocal提供了一个清除线程中对象的方法, 即 remove, 其实内部实现就是调用 ThreadLocalMap 的remove方法: private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } 找到Key对应的Entry, 并且清除Entry的Key(ThreadLocal)置空, 随后清除过期的Entry即可避免内存泄露
再看ThreadLocal应用场景
再回想上文说的,如果我们希望通过某个类将状态(例如用户ID、事务ID)与线程关联起来,那么通常在这个类中定义private static类型的ThreadLocal 实例。
每个线程维护了一个“序列号”
public class SerialNum { // The next serial number to be assigned private static int nextSerialNum = 0; private static ThreadLocal serialNum = new ThreadLocal() { protected synchronized Object initialValue() { return new Integer(nextSerialNum++); } }; public static int get() { return ((Integer) (serialNum.get())).intValue(); } }
Session的管理
private static final ThreadLocal threadSession = new ThreadLocal(); public static Session getSession() throws InfrastructureException { Session s = (Session) threadSession.get(); try { if (s == null) { s = getSessionFactory().openSession(); threadSession.set(s); } } catch (HibernateException ex) { throw new InfrastructureException(ex); } return s; }
在线程内部创建ThreadLocal
还有一种用法是在线程类内部创建ThreadLocal,基本步骤如下: 在多线程的类(如ThreadDemo类)中,创建一个ThreadLocal对象threadXxx,用来保存线程间需要隔离处理的对象xxx。 在ThreadDemo类中,创建一个获取要隔离访问的数据的方法getXxx(),在方法中判断,若ThreadLocal对象为null时候,应该new()一个隔离访问类型的对象,并强制转换为要应用的类型。 在ThreadDemo类的run()方法中,通过调用getXxx()方法获取要操作的数据,这样可以保证每个线程对应一个数据对象,在任何时刻都操作的是这个对象。
java 开发手册中推荐的 ThreadLocal
import java.text.DateFormat; import java.text.SimpleDateFormat; public class DateUtils { public static final ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd"); } }; } DateUtils.df.get().format(new Date());
核心: Fork/Join框架
是为了去解决分治问题 指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解
带着问题理解
Fork/Join框架简介
Fork/Join类关系
Fork/Join框架源码解析
Fork/Join框架源码解析
Fork/Join的陷阱与注意事项
再深入理解
一些Fork/Join例子
Executors
什么是Executor 控制多线程执行的框架 无限制的创建线程会引起应用程序内存溢出。所以创建一个线程池是个更好的的解决方案,因为可以限制线程的数量并且可以回收再利用这些线程。 利用Executors框架可以非常方便的创建一个线程池, Java通过Executors提供四种线程池,分别为: newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
类结构关系
接口: Executor
ExecutorService
ScheduledExecutorService
AbstractExecutorService
FutureTask
当我们需要获取线程的执行结果时,就需要用到它们。Callable用于产生结果,Future用于获取结果。
带着问题理解FutureTask
FutureTask简介
FutureTask类关系
FutureTask源码解析
FutureTask示例
核心: ThreadPoolExecutor
带着问题理解
为什么要有线程池
线程池能够对线程进行统一分配,调优和监控: 降低资源消耗(线程无限制地创建,然后使用完毕后销毁) 提高响应速度(无须创建线程) 提高线程的可管理性
ThreadPoolExecutor例子
Java是如何实现和管理线程池的? 从JDK 5开始,把工作单元与执行机制分离开来,工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
ThreadPoolExecutor使用详解
其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。 
Execute原理
当一个任务提交至线程池之后: 1.线程池首先判断当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2. 2.判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入3. 3.如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。 当ThreadPoolExecutor创建新线程时,通过CAS来更新线程池的状态ctl
参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
workQueue
用来保存等待被执行的任务的阻塞队列. ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO排序任务; LinkedBlockingQuene: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene; SynchronousQuene: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene; PriorityBlockingQuene: 具有优先级的无界阻塞队列; LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer().
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue.
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止;
unit
keepAliveTime的单位
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略: AbortPolicy: 直接抛出异常,默认策略; CallerRunsPolicy: 用调用者所在的线程来执行任务; DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务; DiscardPolicy: 直接丢弃任务; 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
线程池核心线程数的计算公式
计算密集型 单个线程池 = CPU 核数 + 1 多个线程池 = CPU 核数 / 线程池个数 I/O 密集型 单个线程池 = CPU 核数的两倍 多个线程池 = 多个线程池核心线程数之和 = CPU 核数的两倍
三种类型
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。 FixedThreadPool的工作队列为无界队列LinkedBlockingQueue(队列容量为Integer.MAX_VALUE), 这会导致以下问题: 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSize和keepAliveTime将会是个无用参数 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行. 由于使用了无界队列, 所以SingleThreadPool永远不会拒绝, 即饱和策略失效
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列; 和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 执行过程与前两种稍微不同: 主线程调用SynchronousQueue的offer()方法放入task, 倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue的task, 即调用了SynchronousQueue的poll(), 那么主线程将该task交给空闲线程. 否则执行(2) 当线程池为空或者没有空闲的线程, 则创建新的线程执行任务. 执行完任务的线程倘若在60s内仍空闲, 则会被终止. 因此长时间空闲的CachedThreadPool不会持有任何线程资源.
关闭线程池
遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程 关闭方式 - shutdown 将线程池里的线程状态设置成SHUTDOWN状态, 然后中断所有没有正在执行任务的线程. ¶关闭方式 - shutdownNow 将线程池里的线程状态设置成STOP状态, 然后停止所有正在执行或暂停任务的线程. 只要调用这两个关闭方法中的任意一个, isShutDown() 返回true. 当所有任务都成功关闭了, isTerminated()返回true.
ThreadPoolExecutor源码详解
几个关键属性
//这个属性是用来存放 当前运行的worker数量以及线程池状态的 //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //存放任务的阻塞队列 private final BlockingQueue<Runnable> workQueue; //worker的集合,用set来存放 private final HashSet<Worker> workers = new HashSet<Worker>(); //历史达到的worker数最大值 private int largestPoolSize; //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略 private volatile RejectedExecutionHandler handler; //超出coreSize的worker的生存时间 private volatile long keepAliveTime; //常驻worker的数量 private volatile int corePoolSize; //最大worker的数量,一般当workQueue满了才会用到这个参数 private volatile int maximumPoolSize;
内部状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } 其中AtomicInteger变量ctl的功能非常强大: 利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态: RUNNING: -1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务; SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务; STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务; TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止; TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成
任务的执行
execute –> addWorker –>runworker (getTask) 线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。 从Woker类的构造方法实现可以发现: 线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。 firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源; execute()方法 查看源码发现使用了double check addWorker方法
任务的提交
任务关闭
更深入理解
为什么线程池不允许使用Executors去创建? 推荐方式是什么?
配置线程池需要考虑的因素
监控线程池的状态
核心: ScheduledThreadExecutor
带着问题理解
ScheduledThreadPoolExecutor简介
ScheduledThreadPoolExecutor数据结构
ScheduledThreadPoolExecutor源码解析
深入理解
工具类: Executors