导图社区 Java 并发锁工具
Java 并发锁工具
AbstractQueueSynchronizer
简介
构建锁或者其他同步组件的基础框架,主要用来管理同步状态
主要使用方式是继承
只能在一个时刻发生阻塞,降低上下文切换的开销
继承
AbstractOwnableSynchronizer
让线程以独占模式占用的同步器的基础实现
成员变量
private transient Thread exclusiveOwnerThread
当前独占同步状态的线程
主要方法
protected final void setExclusiveOwnerThread(Thread thread)
设置独占线程
protected final Thread getExclusiveOwnerThread()
获取当前独占线程
实现接口
Serializable
内部类
static final class Node
简介
AQS内部通过双向的FIFO同步队列来完成同步状态的管理,当有线程获取失败后被添加到队列末尾,队列的结构就是Node;头尾节点定义在AQS中 条件队列(condition queue)也使用Node的定义,但是处于另一个队列中
成员变量
静态常量
static final Node SHARED = new Node()
表示节点处于共享模式
static final Node EXCLUSIVE = null
表示节点处于独占模式,同一时刻只有一个线程持有同步状态
static final int CANCELLED = 1
结点(线程)因为超时或者中断而处于该状态,并且后续不会变为其他状态,也不应该再被阻塞
static final int SIGNAL = -1
当节点(线程)处于该状态时,后继节点处于或者将处于被阻塞状态(通过park),所以当当前节点释放锁或者被取消时需要调用unpark唤醒后继节点。
static final int CONDITION = -2
表示当前节点(线程)处于条件队列中(通过await),此时不属于同步队列的一部分,直到某一刻状态被设置为0为止(通过signal)
static final int PROPAGATE = -3
传播共享锁(头节点使用)
volatile int waitStatus
包括上面的CANCELLED/SIGNAL/CONDITION/PROPAGATE和0(表示不是这四种状态),非负的数表示该节点不需要交互
volatile Node prev
前驱节点
volatile Node next
后继节点
volatile Thread thread
节点对应的线程
Node nextWaiter
在同步队列中表示节点处于共享还是独占状态,等于SHARED表示共享,为null表示独占;在条件队列中表示指向下一个节点的引用
使用Condition仅在同步队列为独占模式才可以,所以该变量设计为共用
主要方法
构造方法
Node()
用于建立初始节点或者构造SHARED节点
Node(Thread thread, Node mode)
用于在addWaiter中使用
Node(Thread thread, int waitStatus)
在Condition中使用
成员方法
final boolean isShared()
return nextWaiter == SHARED
final Node predecessor()
获取前驱节点,为null时抛出异常
public class ConditionObject
简介
Condition接口的实现,服务于基本的Lock实现。使用Node构造等待队列,每个Condition对象都包含一个FIFO队列(单向)
队列使用Node的nextWaiter属性作为指向下一个节点的引用
实现接口
Condition
Serializable
成员变量
静态常量
private static final int REINTERRUPT = 1
表示从wait离开时需要重新中断
private static final int THROW_IE = -1
表示从wait离开时需要抛出InterruptedException
private transient Node firstWaiter
指向条件队列的头结点
private transient Node lastWaiter
指向条件队列的尾节点
主要方法
public final void await()
使调用的线程进入等待队列、释放锁并进入阻塞状态,在节点被signal/signalAll唤醒后尝试获取锁,如果阻塞被中断唤醒则会响应中断
private Node addConditionWaiter()
添加一个新的node到等待队列。
具体实现逻辑:当发现lastWaiter被取消,则调用unlinkCancelledWaiters清除掉整个队列中被取消的方法(firstWaiter、lastWaiter都可能指向新的node);之后new一个node(CONDITION状态)并将其添加到队列末尾
private void unlinkCancelledWaiters()
通过while从firstWaiter往后清除waitStatus不为CONDITION的节点。由于方法调用在释放锁之前,所以循环过程并不需要加锁。为了避免在GC中残留,在没有signal时该方法只在超时或者取消发生时使用
条件队列中的node的waitStatus应该只有CONDITION和CANCELLED两种状态
final int fullyRelease(Node node)
释放锁状态,如果释放锁状态失败则会抛出IllegalMonitorStateException,并将node设置为CANCELLED状态,成功则返回release之前的state值
release
这里调用了AQS的release方法,找到后面的一个合适节点唤醒
参数使用的是AQS的state属性
final boolean isOnSyncQueue(Node node)
判断node是否在同步队列中(注意这里是同步队列而非条件队列):判断waitStatus为CONDITION时说明不在同步队列,node的pre或者next为null也说明不在同步队列(这两个属性用于同步队列的前驱和后继,条件队列不用这两个属性),之后调用findNodeFromTail方法从尾节点遍历同步队列看node是否在其中
同步队列与条件队列的关系:锁的竞争只依赖同步队列,条件队列仅只是将因为缺少条件而阻塞的线程保存起来,当线程需要竞争锁时依然需要转化到同步队列中进行
代码中用该方法作为while的判断条件,如果node不在同步队列中(说明并没有被signal/signalAll),则会进入while内部通过park阻塞。当node被唤醒后会先判断是否被中断唤醒,如果不是则会继续回到while过程,否则会尝试将node加入同步队列并break跳出while
acquireQueued(node, savedState)
不停地为同步队列中的node节点尝试获取savedState同步状态
unlinkCancelledWaiters
获取到锁之后如果node.nextWaiter不为null的话调用该方法清除取消状态的节点
reportInterruptAfterWait
如果之前是因为中断被唤醒,则这里需要根据之前判断的结果对中断进行处理,是重置中断状态还是抛出中断异常
public final long awaitNanos(long nanosTimeout)
基本逻辑与await类似,加入了超时的判断
transferAfterCancelledWait的后面部分不是很懂
public final boolean await(long time, TimeUnit unit)
基本逻辑与awaitNanos类似,可以指定时间单位,最终都是换算为纳秒进行计算
public final void awaitUninterruptibly()
基本逻辑与await类似,不响应中断(即selfInterrupt重置中断状态)
public final boolean awaitUntil(Date deadline)
基本逻辑与await类似,加入了最大阻塞时间
public final void signal()
将等待最长的线程从条件队列移动到同步队列
protected boolean isHeldExclusively()
返回当前线程是否以独占模式持有该锁,是则返回true。该方法是AQS中的,并且没有提供具体的实现,但是由于该方法只在condition中使用,所以如果不使用ConditionObject的话可以不用实现,否则需要实现
private void doSignal(Node first)
将条件队列中的节点移除队列并转化为同步队列的节点。实现过程是个循环,从前到后,直至遇到一个非null和不是CANCELLED状态的节点,将其转化后退出循环
final boolean transferForSignal(Node node)
将node从条件队列转换到同步队列,返回是否成功转换。实现逻辑为先判断node状态是否为CONDITION,如果不是说明是被cancel的节点,返回false;接着调用enq入同步队列并取得node在队列中的前驱节点,之后如果前驱节点的状态>0或者CAS修改waitStatus失败(发生了变化),则unpark该线程,最后返回true
加入同步队列后不一定直接唤醒,只有waitStatus>0或者在CAS执行期间发生了变化,才会unpark唤醒线程。此时不唤醒的话会等到同步队列逻辑中再唤醒
enq
public final void signalAll()
将条件队列中所有符合条件(未被取消)的节点转移到同步队列中。基本逻辑与signal相似,区别在于transferForSignal方法循环执行
isHeldExclusively
private void doSignalAll(Node first)
transferForSignal
protected final boolean hasWaiters()
是否有线程在条件队列中,实现方式为从头开始循环条件队列,如果有节点状态为CONDITION则返回true,否则返回false
成员变量
静态变量
static final long spinForTimeoutThreshold = 1000L;
用于超时的阈值,如果小于这个值就没有必要调用带超时的park,而是让循环继续执行,这时的自旋效率更高
private static final Unsafe unsafe
Unsafe.getUnsafe(),下面的都是通过objectFieldOffset方法获取偏移量,方便后续直接通过Unsafe的CAS操作进行修改
private static final long stateOffset
AQS的state
private static final long headOffset
AQS的head
private static final long tailOffset
AQS的tail
private static final long waitStatusOffset
Node的waitStatus
private static final long nextOffset
Node的next
private transient volatile Node head
同步队列的头结点,懒加载,只通过setHead修改;头节点存在时其状态不能为CANCELLED
private transient volatile Node tail
同步队列的尾节点,懒加载
private volatile int state
同步状态
主要方法
protected final int getState()
返回同步状态的当前值
protected final void setState(int newState)
设置当前同步状态
protected final boolean compareAndSetState(int expect, int update)
CAS设置state,底层调用Unsafe的CAS方法
private Node addWaiter(Node mode)
入列方法
用指定模式添加当前线程进入队列末尾
值得注意的是
参数Node用于指定模式,线程通过currentThread获取
实现中tail不为空时先快速尝试一遍cas设置tail,成功直接返回,失败再调用enq方法
private Node enq(final Node node)
循环(自旋)CAS,tail为空时先CAS初始化head(新建node),tail不为空时CAS设置tail
这里参数node为需要添加的节点而非模式
public final boolean hasQueuedPredecessors()
判断是否有节点在当前线程之前,有的话返回true,否则返回false。用于公平锁的实现。 具体实现:当队列不为空时判断head的后继节点为空或者不为当前线程则返回true
什么时候head.next 为null?
public final boolean hasQueuedThreads()
return head != tail; 队列中是否有线程等待
public final boolean isQueued(Thread thread)
线程是否在队列中,实现方式是从尾到头循环
public final int getQueueLength()
获取队列长度,实现方式为从尾到头循环计算thread不为null的节点数量
public final Collection<Thread> getQueuedThreads()
获取在队列中的线程集合,以Collection的形式返回。实现方式为构造ArrayList,从尾到头循环将不为null的线程添加进去,之后返回这个ArrayList对象
public final boolean owns(ConditionObject condition)
condition对象是否属于当前同步器AQS
独占模式
同一时刻只有一个线程能获取同步状态
public final void acquire(int arg)
独占模式获取同步状态,忽略中断;先调用tryAcquire尝试获取独占锁,失败后调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)尝试将线程加入同步队列和判断节点状态,然后根据返回的结果(是否有过线程中断)调用线程interrupt方法恢复中断状态
acquire本身不响应中断,故对于中断的处理是恢复中断状态
protected boolean tryAcquire(int arg)
尝试获取独占锁,成功返回true,失败返回false,具体实现由子类完成,需要保证获取同步状态时的线程安全
final boolean acquireQueued(final Node node, int arg)
arg可以理解为需要竞争的资源,AQS内部用state来代表,但实际上具体的用途由开发人员自行定义
为已经在队列中的线程不间断的尝试获取锁(node已经由addWaiter创建)
实现逻辑:自旋如下过程:如果前驱节点是head并且获取锁成功,则设置当前节点为head,返回不没有被中断;否则进行后面的操作:调用shouldParkAfterFailedAcquire判断和设置节点状态以及调用parkAndCheckInterrupt阻塞线程和检查中断状态。如果过程中出现异常,则调用cancelAcquire取消获取
实际上即使没有获取到锁也不会不停的循环,当成功设置前驱节点的状态并用park中断线程后,线程就被挂起了; head后的节点会使用tryAcquire与新线程竞争锁,说明锁在这里的实现是非公平的
private void setHead(Node node)
将节点设置为head,同时置thread和prev属性为null方便GC。实际上相当于从队列头部出队
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
检查和更新获取锁失败的节点的前驱节点状态为SIGNAL,返回当前线程是否需要阻塞
实现逻辑:判断前驱节点waitStatus,当为SIGNAL时返回true;当>0时说明前驱节点已经被取消,循环跳过前驱节点,直到某个前驱节点的waitStatus<=0时,之后返回false;否则CAS设置前驱节点的waitStatus为SIGNAL,返回false
private final boolean parkAndCheckInterrupt()
调用park,检查中断状态并返回
return thread.interrupted() 这里返回的结果用于判断线程被唤醒是否是因为中断,如果因为中断被唤醒,在acquireQueued中依然在循环中,会再次被park阻塞(由于interrupted()方法会清除中断标志,所以再次park可以生效);当被unpark唤醒时会获取到锁;伪唤醒时通过外层循环再次检查即可
伪唤醒的原因?
private void cancelAcquire(Node node)
在整个方法抛出异常时调用,将线程从Node中置空,状态置CANCELLED,向前找到waitStatus<=0的节点作为前驱;把自己从队列中去除,在需要的情况下需要唤醒后面的某个合适的节点
private void unparkSuccessor(Node node)
需要注意的是,如果node的next不为空,则将其unpark;如果node的next为null,则从tail出发从后往前找到不为node的最前面的一个waitStatus<=0的节点将其unpark
之所以从后往前找是因为双向链表设置前驱和后继并不是原子操作,可能出现某个节点的next为空但是已经在队列里的情况,又或者是节点刚被取消,其next指向了自己,而遍历也刚好来到该节点
public final void acquireInterruptibly(int arg)
响应中断的独占模式获取同步状态,会在tryAcqure前先用Thread.interrupted()判断是否中断,是的话抛出异常
tryAcquire
先尝试一次
private void doAcquireInterruptibly(int arg)
基本逻辑与acquireQueued相同,区别在于内部调用addWaiter以及不返回是否有线程中断而是直接在检查到由中断唤醒park后抛出InterruptedException异常
cancelAcquire
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
带超时时间的独占式获取同步状态,响应中断。如果没有在超时时间内获取同步状态则返回false
tryAcquire
先尝试一次
private boolean doAcquireNanos(int arg, long nanosTimeout)
基本逻辑与doAcquireInterruptibly相同,增加了时间的判断逻辑,另外使用了spinForTimeoutThreshold阈值
cancelAcquire
public final boolean release(int arg)
独占模式释放同步状态。首先调用tryRelease尝试修改state,成功之后判断head != null并且head.waitStatus!=0(这里head的waitStatus为0则表示空队列)时调用unparkSuccessor唤醒后面的节点,然后返回true;tryRelease失败返回false
protected boolean tryRelease(int arg)
尝试修改state,AQS中没有具体实现,需要子类自行实现。release方法的arg变量用于该方法;返回值为是否成功释放state
arg的说明见下方实现部分
unparkSuccessor
共享模式
同一时刻有多个线程获取同步状态
public final void acquireShared(int arg)
共享模式获取同步状态,忽略中断。实现:先调用tryAcquireShared尝试获取同步状态,失败(结果小于0)后调用doAcquireShared自旋获取同步状态
protected int tryAcquireShared(int arg)
尝试获取同步状态,AQS中不提供具体方法,由子类实现。返回值小于0表示获取失败;等于0表示获取成功,但随后没有其他共享模式获取成功;大于0表示获取成功并且有其他的共享模式获取也可能成功
private void doAcquireShared(int arg)
入队后自旋尝试获取同步状态,基本逻辑与独占模式下的acquireQueued基本相同,区别在于当节点的前驱为head时,需要对tryAcquireShared返回的结果进行判断,如果大于等于0(说明依然有资源,可以继续传播),则调用setHeadAndPropagate设置head以及传播属性(设置新的head并判断后继节点,如果合适就唤醒);否则依然需要调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt进行后续操作(park以及后续中断判断等)。另外对中断状态的设置selfTnterrupt方法也放在该方法中执行
tryAcquireShared
节点的前驱为head时先尝试一遍
private void setHeadAndPropagate(Node node, int propagate)
传播的体现。该方法实际检查的是当前获取到锁的节点的后继节点,即可以向后传播一次。开始的时候我在疑惑为什么没有找到类似于循环中不停向后唤醒的代码,后来发现setHeadAndPropagate本身就在for(;;)中,每个被唤醒的节点如果获取到了锁都会执行该方法,这样就达到一个一个向后传播的目的。注意并不是一起被唤醒然后竞争,还是一个一个被唤醒的。
继续向后唤醒节点。 该方法在tryAcquireShared返回结果r>=0(说明还有资源可用)时才会调用,r作为propagate参数传入方法。该方法会将node设置为head,之后判断是否可以向后传递,如果可以则调用doReleaseShared唤醒后继节点
判断逻辑: propagate>0(说明还有许可) 或者 之前的head == null 或者 之前的head.waitStatus < 0 或者 现在的head(也就是node) == null 或者 现在的head.waitStatus < 0 然后进入下一步: node.next == null 或者 node.next.isShared()
实际上并不是很明白: 1.为什么需要判断现在的head((h = head) == null) 2.为什么node.next == null后依然还要doReleaseShared
1的推测:因为判断过程中head可能被其他线程修改,所以如果新的head依然满足这个条件那么还是可以继续;感觉这里的判断逻辑更像是不知道为什么head为null,但还是尝试一下,一种保守的处理策略,那2的话可能也是类似的保守策略
只判断waitStatus<0是因为head有可能状态更改为了SIGNAL,而CONDITION并不会出现在这里
private void doReleaseShared()
自旋唤醒后继节点,与独立模式不同点在于还要保证传播的性质 实现:循环:当队列不为空时,判断head的waitStatus,等于SIGNAL时先CAS设置waitStatus为0,成功则调unparkSuccessor(head)唤醒后继节点,失败则从头再来;waitStatus为0并且CAS设置waitStatus为PROPAGATE失败时也重头再来;之后判断head==h看头结点是否有修改,如果有修改则退出循环
注意该方法没有传参,直接从head开始
推测:共享模式node可能只有0(刚创建的)、PROPAGATE和SIGNAL三种状态
unparkSuccessor
cancelAcquire
public final void acquireSharedInterruptibly(int arg)
共享模式获取同步状态,支持中断,支持的方式与独占模式类似
tryAcquireShared
private void doAcquireSharedInterruptibly(int arg)
与doAcquireShared逻辑类似,在检查到中断后抛出异常
cancelAcquire
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
带超时时间的共享模式获取同步状态,支持中断。
tryAcquireShared
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
超时判断逻辑与独占模式类似,其他执行逻辑与doAcquireShared类似
cancelAcquire
public final boolean releaseShared(int arg)
共享模式释放同步状态
protected boolean tryReleaseShared(int arg)
空方法,由子类实现,尝试释放同步状态。arg是releaseShared的参数,可以自行定义;返回值为当这个方法成功释放了同步状态,其他线程可以acquire时返回true,否则返回false
doReleaseShared
在这里依然只是唤醒后续一个节点,后续唤醒的传播由唤醒的这个节点进行
如何基于AQS实现自己的锁
竞争的资源
AQS用私有变量state抽象地代表锁竞争的资源(比如1表示占用,0表示未被占用)
对state的操作需要使用相关的方法:getState、setState、compareAndSetState等
按需实现的方法
tryAcquire、tryAcquireShared、tryRelease、tryReleaseShared、isHeldExclusively
AQS中的各种方法的参数arg最终都是传入这些方法的,故这些方法决定了是否需要arg参数以及怎么用,这是源代码注释中“can represent anything”的含义
排他锁(独立模式)
protected boolean tryAcquire(int arg);
实现查询并获取竞态资源,通常由state的CAS操作完成,传入参数可由开发者自行定义
简单示例: protected boolean tryAcquire(int acquires) { assert acquires == 1; // 定义传入参数只能是1 if (compareAndSetState(0, 1)) { return true; } return false; } protected boolean tryRelease(int releases) { assert releases == 1; // 定义传入参数只能是1 if (getState() == 0) throw new IllegalMonitorStateException(); setState(0); return true; }
protected boolean tryRelease(int arg);
实现释放竞争资源,通常大概是state变量自减或者清零,传入参数可由开发者自行定义
方法内部不应该出现阻塞和等待的情况
共享锁
protected int tryAcquireShared(int acquires)
传入参数由开发者自行定义,可视为想要获取的资源个数;返回值int可视为剩余的共享资源个数
简单示例: protected int tryAcquireShared(int acquires) {//non-fair for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (compareAndSetState(current, next)) return true; } }
protected boolean tryReleaseShared(int releases)
传入参数由开发者自行定义,可视为想要释放的资源个数
条件变量
虽然AQS的实现逻辑没有用到state,但在使用条件变量时会默认将state作为release(int arg)的传入参数,故在实现这些的方法时往往需要修改state变量
protected boolean isHeldExclusively()
返回当前线程是否独占了条件变量对应的互斥锁
常见的实现方式: protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } protected boolean tryAcquire(int acquires) { ...... setExclusiveOwnerThread(Thread.currentThread()); return true; }
ReentrantReadWriteLock
ReentrantLock
简介
可重入锁,分为公平锁和非公平锁两种方式
实现接口
Lock
Serializable
内部类
abstract static class Sync
抽象类,是ReetrantLock的实现基础
继承
AbstractQueuedSynchronizer
主要方法
abstract void lock()
获取锁,抽象方法吗,需要子类实现
final boolean nonfairTryAcquire(int acquires)
非公平的tryLock实现 具体实现为:当前没有线程获取锁时CAS获取锁,成功就设置锁独占线程为当前线程;如果当前线程已经持有了锁,则更新当前锁的数量。这两种情况会返回true,表示持有锁,其他情况均返回false
那么为什么一个非公平的实现会放在父类Sync中?因为该方法公平锁和非公平锁都会用到,二者的tryLock都是基于该实现
protected final boolean tryRelease(int releases)
AQS中tryRelease的实现,具体为先计算锁总数减去释放锁数量得到的剩余持有锁的数量c,如果c为0则清空状态;设置state为c的值,返回是否没有锁被持有
protected final boolean isHeldExclusively()
AQS中isHeldExclusively实现,return getExclusiveOwnerThread() == Thread.currentThread()
final int getHoldCount()
返回当前线程持有了多少锁。该方法依赖于isHeldExclusively的判断
final boolean isLocked()
return getState() != 0; 是否有线程持有锁
final Thread getOwner()
返回持有锁的线程,没有时返回null
final ConditionObject newCondition()
new ConditionObject()
private void readObject(java.io.ObjectInputStream s)
反序列化方法,state置为0
static final class FairSync
公平锁实现类
继承
Sync
主要方法
final void lock()
acquire(1)
这里的参数1是有实际意义的,与CountDownLatch可不一样
protected final boolean tryAcquire(int acquires)
公平锁的tryAcquire实现。实现过程为:占用锁c为0时,如果前面没有线程并且cas设置c为acquires成功,则设置当前线程为独占线程并返回true;如果当前线程已经持有锁,则更新锁数量并返回true;其他情况均返回false
hasQueuedPredecessors
这里比非公平锁多了这个判断,即使当前没有线程获取锁,如果有等待时间更长的线程也会放弃这次获取锁
static final class NonfairSync
非公平锁实现类
继承
Sync
主要方法
final void lock()
先尝试直接cas(0,1),如果成功说明当前正好没有别人获取锁,当前线程成功抢到锁,之后设置独占线程为当前线程;如果失败再acquire(1)
实际上非公平锁的tryAcquire方法中也会尝试cas,这里直接cas可能是为了更加突出不公平性吧,更早的调用则可能会更早的获取到资源
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires) 非公平的实现已经在Sync中存在了,直接调用
成员变量
private final Sync sync
表示锁使用公平还是非公平实现,实例化后对应FairSync或者NonfairSync
构造函数
public ReentrantLock()
默认构造非公平锁
public ReentrantLock(boolean fair)
根据fair选择构造公平锁或者非公平锁
主要方法
public void lock()
sync.lock()
public void lockInterruptibly()
sync.acquireInterruptibly(1)
public boolean tryLock()
return sync.nonfairTryAcquire(1);如果成功获取锁或者当前线程已经持有锁,则返回true,其他情况返回false
这里无论是什么锁,这里都是非公平的尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
return sync.tryAcquireNanos(1, unit.toNanos(timeout))
tryAcquireNano调用了tryAcquire,故这里是有公平和非公平的区别的
public void unlock()
sync.release(1)
public Condition newCondition()
return sync.newCondition()
public int getHoldCount()
return sync.getHoldCount() 当前线程持有的锁数量
public boolean isHeldByCurrentThread()
return sync.isHeldExclusively() 当前线程是否持有锁
public boolean isLocked()
return sync.isLocked()
public final boolean isFair()
return sync instanceof FairSync
protected Thread getOwner()
return sync.getOwner()
public final boolean hasQueuedThreads()
return sync.hasQueuedThreads() 是否有线程在队列中
public final boolean hasQueuedThread(Thread thread)
return sync.isQueued(thread); 当前线程是否在队列中
public final int getQueueLength()
return sync.getQueueLength();
protected Collection<Thread> getQueuedThreads()
return sync.getQueuedThreads();
public boolean hasWaiters(Condition condition)
是否有线程在给定condition上等待(对应条件队列上是否有CONDITION状态的线程)
public int getWaitQueueLength(Condition condition)
给定condition的条件队列中CONDITION状态的线程数量
protected Collection<Thread> getWaitingThreads(Condition condition)
Condition
简介
接口;条件变量主要用来管理线程执行对某些状态的依赖,将Object的monitor方法(wait/notify/notifyAll)对应到直接的对象上,通过将他们与任意Lock实现的使用相结合来实现每个对象拥有多个等待集的效果。相当于Lock代替了synchronize,Condition代替了monitor方法,即Condition需要与Lock配合使用
Condition与monitor方法的主要区别有: 1. Condition支持多个等待队列,一个Lock实例可以绑定多个Condition 2. Condition可以支持响应中断和超时设置
绑定多个Condition的深层含义就是可以针对不同的情况对线程进行管理,所谓的“更精细”的控制,因为不同条件而阻塞的线程在不同的条件队列中,在需要被唤醒的时候才会进入同步队列进行竞争,本身就做了区分和隔离
理解
与一般的锁竞争的区别在于condition强调的是条件,多线程环境下需要满足条件才可以执行某些操作;而一般的锁竞争则只是相当于竞争执行某段代码的权利
通常的编程模型: 获取锁; while (条件状态不满足) { 释放锁; 线程挂起等待,直到条件满足通知; 重新获取锁; } 临界区操作; 释放锁;
三元关系:锁、wait、条件断言(predicate)。每一次wait调用都会隐式地与特定条件断言关联起来;调用某个特定条件断言的wait时,调用者必须已经持有该断言相关的条件队列的锁,且这个锁必须保护着构成条件断言的状态变量
predicate:一个返回bool类型的函数
创建
通过Lock接口的newCondition方法创建,需要实现的类自行实现逻辑(本质上都是创建ConditionObject的实例)
主要方法
void await()
让线程等待,直到被signal或者interrupt唤醒;调用时会自动释放condition关联的锁,线程会停止执行当前任务,直到被signal/signalAll/interrupt/spurious wakeup唤醒。在调用await的方法结束之前线程需要重新尝试获取condition关联的锁
boolean await(long time, TimeUnit unit)
与awaitNanos类似,可以指定时间的单位,时间耗完时返回false,否则返回true
long awaitNanos(long nanosTimeout)
与await类似,但是最迟会在指定时间后醒来,返回没有用到的时间
void awaitUninterruptibly();
与await类似,但是不会响应中断
boolean awaitUntil(Date deadline)
与awaitNanos类似,指定时间的方式不同,如果deadline已经达到则返回false,否则返回true
void signal()
唤醒一个等待状态的线程,线程唤醒后需要再次获取锁
void signalAll()
唤醒所有等待状态的线程
Lock
简介
接口,提供了通用的一些请求/释放锁的方法
与synchronized的对比
synchronized的相关机制(比如monitor对象等)是内置的,在语言层面无法访问,而Lock是语言层面用来替代的方案,故可以获得一些语言层面的便利(比如知道是否成功获取到锁等等)
synchronized的加锁和解锁过程是由关键字自行完成,而Lock需要显式调用加锁和解锁方法
Lock可以使用条件变量来更灵活的使用锁
Lock可以响应中断,而synchronized不会
主要方法
void lock()
void lockInterruptibly()
请求锁过程中会响应中断
boolean tryLock()
尝试获取锁并直接返回结果(并不会阻塞当前线程)
boolean tryLock(long time, TimeUnit unit)
带超时和响应中断的tryLock
void unlock();
Condition newCondition()
返回一个与该锁相关联的Condition对象
LockSupport
简介
用来创建锁和其他同步类的基本线程阻塞原语(与许可(permit)相关)
底层依赖Unsafe实现
构造方法
私有,不允许构造该类的对象
成员变量
private static final sun.misc.Unsafe UNSAFE
在static代码块中构建
sun.misc.Unsafe
private static final long parkBlockerOffset
通过Unsafe获取Thread类对象中parkBlocker属性的偏移位置
parkBlocker是线程被阻塞时的记录,方便监视和诊断工具识别线程被阻塞的原因;线程未被阻塞时为null
private static final long SEED
通过Unsafe获取Thread类中threadLocalRandomSeed属性的偏移位置
private static final long PROBE
通过Unsafe获取Thread类中threadLocalRandomProbe属性的偏移位置
private static final long SECONDARY
通过Unsafe获取Thread类中threadLocalRandomSecondarySeed属性的偏移位置
这三个属性由ThreadLocalRandom来进行设置
主要方法
public static Object getBlocker(Thread t)
通过Unsafe获取Thread对象中的parkBlocker对象
private static void setBlocker(Thread t, Object arg)
私有方法,通过Unsafe设置Thread中的parkBlocker对象,下面的设置blocker的方法都调用该方法
public static void park()
阻塞线程,等待“许可”(permit)后继续执行
具体实现(Hotspot):Parker类
具体的实现与平台相关,每个线程都有一个Parker实例,在linux实现中实际上是以mutex和condition最终实现了锁和阻塞的过程(具体看笔记文章,这里不写了)
主要成员变量(linux实现)
private volatile int _counter
表示许可,大于0表示已获取许可,每次park只会将其设置为1,不会递增
故调用两次unpark后调用两次park依然会在第二次时阻塞
protected pthread_mutex_t _mutex[1]
互斥变量
在代码中可以通过linux原子指令pthread_mutex_lock、pthread_mutex_trylock、pthread_mutex_unlock对变量进行加锁和解锁操作
protected pthread_cond_t _cond[1]
条件变量
利用线程间共享的全局变量进行同步的一种机制,一般和互斥锁结合使用(避免条件变量的竞争);可以通过pthread_cond_wait、pthread_cond_timedwait等指令进行操作
linux内核中同样有等待队列来管理阻塞进程
方法
park
unpark
与wait在使用上的区别
wait需要等待notify才可以唤醒,存在先后过程,而park等待的unpark可以先于park调用,此时park之后的程序可以继续执行
wait需要获取对象的监视器,而park不需要
注意
park中发生中断,则park也会返回但不会抛出中断异常
public static void park(Object blocker)
由于parkBlocker的作用,因此带blocker参数的park方法更推荐使用,使用后在通过jstack等诊断工具时可以看到线程阻塞在哪个对象上的提示,比如parking to wait for <0x000000045ed12298> (a xxxx object全限定名)
public static void parkNanos(Object blocker, long nanos)
public static void parkNanos(long nanos)
最多阻塞多少时间后程序继续执行
public static void parkUntil(Object blocker, long deadline)
public static void parkUntil(long deadline)
public static void unpark(Thread thread)