导图社区 java多线程并发编程知识点总结
java多线程并发编程知识点总结,内容有并发基础、并发工具类、锁、其他、线程通信、并发合集、线程池、原子操作等。
编辑于2022-05-09 21:47:40java多线程并发编程知识点总结
并发 基础
AQS
AbstractqueuedSynchronizer同步器
> 用来构建锁或其他同步组件的基础框架。 内部通过一个int 类型的成员变量state来控制同步状态: - state=0: 说明没有任何线程占有共享资源的锁。 - state = 1:说明有线程正在使用共享变量,其他线程必须加入同步队列进行等待。 `AQS`内部通过内部类Node构成FIFO的同步队列来 完成线程获取锁的排队工作,同时利用内部类ConditionObject构建等待队列。 - Condition调用wait()方法后,线程将会加入等待队列中。 - Condition调用signal()方法后,线程 将从等待队列移动同步队列中进行锁竞争。 **上面设计到两个队列** - 同步队列:当线程请求锁而等待的请求将加入同步队列等待 。 - 等待队列(可能多个): 通过Condition调用await()方法释放锁后,将加入等待队列。
队列同步器
`同步器的设计`是基于`模板方法`模式的,也就是说,使用者需要继承同步器并重写指定的 方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些 模板方法将会调用使用者重写的方法
以下3个方法来 修改同步 状态
getState():获取当前同步状态
setState(int newState):设置当前同步状态
compareAndSetState(int expext,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
同步器可重写的方法基本 为3类
独占式 获取与释放 同步状态
共享式获取与释放不同状态
查询同步队列中的等待线程情况
独占锁
同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁
实现方式
同步队列
同步器依赖内部的同步队列(`FIFO`双向队列)来完成同步状态的管理,当前线程获取状态失败时,同步器会将当前线程以及等待状态等信息构造成称为一个节点(`Node`)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
独占式同步状态获取与释放
`独占锁概念`:同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁。 通过调用同步的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步 状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除,该代码如下: ``` public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` 上述代码主要完成`同步状态获`、`节点构造`、`加入同步队列`以及在同`步对队列中自`旋等待的相关工作。 主要逻辑如下: - 首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,统一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以"以死循环"的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点 的出队或阻塞线程被中断来实现。
共享式同步状态获取与释放
共享锁(读锁):统一时刻可以有多个线程获取到同步状态
CAS
Compare And Swap
CAS 的全称是 Compare And Swap 即比较交换,其算法核心思想如下 函数:CAS(V,E,N) 参数:V 表示要更新的变量 E 预期值 N 新值 如果 V 值等于 E 值,则将 V 的值设为 N。若 V 值和 E 值不同,则说明已经有其他线程做了 更新,则当前线程什么都不做。通俗的理解就是 CAS 操作需要我们提供一个期望值,当期 望值与当前线程的变量值相同时,说明还没线程修改该值,当前线程可以进行修改,也就是 执行 CAS 操作,但如果期望值与当前线程不符,则说明该值已被其他线程修改,此时不执 行更新操作,但可以选择重新读取该变量再尝试再次修改该变量,也可以放弃操作
缺陷
ABA
因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值 原来是A,变成了B,又变成了B,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了,。
ABA解决方案
解决思路就是使用版本号。在变量前面追加版本号,每次变量更新的时候把版本号+1,那么A->B->A 就会变成1A->2B->3A。从JDK1.5 开始,JDK的Atomic包里提供了一个类Atomic包里提供了一个AtomicStampedReference来解决ABA问题, ``` public boolean compareAndSet( V expectedReference, // 预期引用 V newReference, // 更新后的引用 int expectedStamp, // 预期标志 int newStamp // 更新后的标志 ) ``` `compareAndSet`方法的作用首先检查当前引用是否等于预期 引用,并且检查当前标志是否 等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长 开销大
自旋`CAS`如果长时间不成功,会给`CPU`带来非常大的执行开销
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环`CAS`的方式来保证原子操作,但是对多个共享变量操作时,循环`CA`S就无法保证操作的原子 性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量`i=2,j=a`,合并一下ij=2a,然后用`CA`S来操作ij。从Java 1.5开始,JDK提供了`AtomicReference`类来保证引用对之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
线程间通信
volatile和synchronized关键字
`volatile`:用来修饰子字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而 对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。 `synchronized`;可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
等待/通知
**等待方遵循如下原则** 1)获取对象的锁 2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条 3)条件满足 则执行对应的逻辑 对应的伪代码 ``` synchronized(对象) { while(条件不满足) { 对象.wait(); } 对应的处理逻辑 } ``` 通知方遵循如下原则。 1)获得对象的锁。 2)改变条件。 3)通知所有等待在对象上的线程。 对应的伪代码如下。 ``` synchronized(对象) { 改变条件 对象.notifyAll(); } ```
使用wait()、notify()和notifyAll()时需要先对调用对象加锁
调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列
notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifyAll的线程释放锁之后,等待的线程 才有机会从wait()返回。
notify()方法将等待队列中的一个等待线程从等待队列 中移到同步队列中,而notifyAll()方法将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKING .
从wait()方法返回的前提 是获得了调用对象的锁
Thread.join()
如果一个线程A执行了thread.join()语句,其含义是当前线程 A等待thread线程终止之后 才从thread.join()返回, ``` public class Join { public static void main(String[] args) throws Exception { Thread previous = Thread.currentThread(); for (int i = 0; i < 10; i++) { // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回 Thread thread = new Thread(new Domino(previous), String.valueOf(i)); thread.start(); previous = thread; } TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + " terminate."); } static class Domino implements Runnable { private Thread thread; public Domino(Thread thread) { this.thread = thread; } public void run() { try { thread.join(); } catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName() + " terminate."); } } } ```
ThreadLocal
可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取原先设置的值; ``` public class Profiler { // 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次 private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() { protected Long initialValue() { return System.currentTimeMillis(); } }; public static final void begin() { TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final long end() { return System.currentTimeMillis() - TIME_THREADLOCAL.get(); } public static void main(String[] args) throws Exception { Profiler.begin(); TimeUnit.SECONDS.sleep(1); System.out.println("Cost: " + Profiler.end() + " mills"); } } ```
并发工具类
CyclicBarrier
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。示例代码如下所示: ``` public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } } ``` `CyclicBarrier`还提供一个更高级的构造函数`CyclicBarrier(int parties,Runnable barrier- Action)`,用于在线程到达屏障时,优先执行`barrierAction`,方便处理更复杂的业务场景,
应用场景
`CyclicBarrier`可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个`Excel`保存了用户所有银行流水,每个`Sheet`保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个`sheet`里的银行流水,都执行完之后,得到每个`sheet`的日均银行流水,最后,再用`barrierAction`用这些线程的计算结果,计算出整个`Excel`的日均银行流水。 ``` public class BankWaterService implements Runnable { /** * 创建4个屏障,处理完之后执行当前类的run方法 */ private CyclicBarrier c = new CyclicBarrier(4, this); /** * 假设只有4个sheet,所以只启动4个线程 */ private Executor executor = Executors.newFixedThreadPool(4); /** * 保存每个sheet计算出的银流结果 */ private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); private void count() { for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { // 计算当前sheet的银流数据,计算代码省略 sheetBankWaterCount.put(Thread.currentThread().getName(), 1); // 银流计算完成,插入一个屏障 try { c.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run() { int result = 0; // 汇总每个sheet计算出的结果 for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); } // 将结果输出 sheetBankWaterCount.put("result", result); System.out.println(result); } public static void main(String[] args) { BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } } ```
CountDownLatch
`CountDownLatch`允许一个或多个线程等待其他线程完成操作。 > 假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用 join()方法 ``` public class JoinCountDownLatchTest { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); } } ``` join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去 ,代码如下: ``` while (isAlive()) { wait(0); } ``` > 直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里 实现的,所以在JDK里看不到,大家可以查看JVM源码。 CountDownLatch实现: ``` public class CountDownLatchTest { staticCountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); System.out.println(2); c.countDown(); } }).start(); c.await(); System.out.println("3"); } } ``` ------------------------------ 当我们调用`CountDownLatch`的`countDown`方法时,N就会减1,`CountDownLatch`的`await`方法会阻塞当前线程,直到N变成零。由于`countDown`方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个`CountDownLatch`的引用传递到线程里即可。 如果有某个解析`shee`t的线程处理得比较慢,我们不可能让主线程一直等待,所以可以使用另外一个带指定时间的`await`方法——`await(long time,TimeUnit unit)`,这个方法等待特定时 间后,就会不再阻塞当前线程。`join`也有类似的方法。
一个线程调用countDown方法happen-before,另外一个线程调用await方法。
CyclicBarrier和CountDownLatch的区别
`CountDownLatch`的计数器只能使用一次,而`CyclicBarrier`的计数器可以使用`reset()`方法重置。所以`CyclicBarrier`能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。 ----------- |CountDownLatch|CyclicBarrier| |---|---| |减计数方式|加计数方式| |计算为0时释放所有等待的线程|计数达到指定值时释放所有等待线程| |计数为0时,无法重置|计数达到指定值时,计数置为0重新开始| |调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响|调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞| |不可重复利用|可重复利用|
Semaphore
`Semaphore`(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
使用场景
`Semaphore`可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用`Semaphore`来做流量控制。 ----------- ``` public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorServicethreadPool = Executors .newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (inti = 0; i< THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } } ```
锁
ReentrantLock
实现重进入
`重进入`是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决一下两个问题: 1、`线程再次进入`:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。 2、`锁的 最终释放`:线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行技术自增,技术表示当前锁被重复获取的次数,而锁被释放时,计数自减去,当技术为0时表示锁已经成功释放。 `成功`获取锁的线程再次获取锁,只是增加了同步状态值。 如果该锁被获取了`n`次,那么前`(n-1)`次`tryRelease(int releases)`方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条 件,当同步状态为0时,将占有线程设置为null,并返回`true`,表示释放成功。
公平锁
锁的获取顺序应该符合请求的绝对时间顺序,也就是FIFO。
非公平锁
只要CAS设置同步状态成功,则表示当前线程获取了锁
ReentrantReadWriterlock
volatile
volatile实现机制
`volatile`是轻量级的`synchronized`,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。`volatile`不会引起上下文的切换 和 调度。
内存语义
volatile的特性
`volatile`变量自身具有下列特性: - 可见性 对一个`volatile`变量的读,总是能看到(任意线程)对这个`volatile`变量最后的写入。 - 原子性 对任意单个`volatile`变量的读/写具有原子性,但类似于`volatile++`这种符合操作不具有原子性
volatile写-读的内存语义
- 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。 - 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
volatile内存语义的实现
 - 当第二个操作时volatile写时,不管第一个操作时什么,都不能重排序 - 当第一个操作时volatile读时,不管第二个操作是什么,都不能排序 - 当第一个操作时volatile写,第二个操作时volatile读时,不能重排序 > 为了实现volatile的内存语义,编译器在生成字节码时,会在指令列中插入内存屏障来禁止特定类型的处理器重排序。下面是基于保守策略的JMM内存屏障插入策略。 - 在每隔volatile写操作 的前后插入一个StoreStore屏障。 - 在每个volatile写操作的后面插入一个StoreLoad屏障。 - 在每个volatile读操作的后面插入一个LoadLoad屏障 - 在每个volatile读操作的后面插入一个LoadStore屏障
锁的内存语义
> 锁释放和锁获取语义 - 线程A释放一个锁,实质上线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做的修改的消息。 - 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在这个锁之前对共享变量所做的修改的)消息 - 线程A释放锁,随后线程B获取这个锁,这个过程实质上线程A通过主内存向线程B发送消息
NonfairSync
NonfairSync
> 锁释放和锁获取语义 - 线程A释放一个锁,实质上线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做的修改的消息。 - 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在这个锁之前对共享变量所做的修改的)消息 - 线程A释放锁,随后线程B获取这个锁,这个过程实质上线程A通过主内存向线程B发送消息
concurrent包的实现
 仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式: - 声明共享变量的volatile - 使用CAS的原子条件更新来实现线程之间的同步 - 配合以volatile的读/写和CAS所具有volatile读和写的内存语义来实现线程之前的通信
Lock接口
``` Lock lock = new ReentrantLock(); lock.lock(); try{ }finally{ lock.unlock(); } ``` 在finally块中释放锁,目的是保证在获取到锁之后,最终能释放锁。
Condition
Condition接口提供了类似Object的 监视器方法,与Lock配合可以实现等待/通知模式  ``` public class BoundedQueue<T> { private Object[] items; // 添加的下标,删除的下标和数组当前数量 private int addIndex, removeIndex, count; private Lock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); public BoundedQueue(int size) { items = new Object[size]; } // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位" public void add(T t) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[addIndex] = t; if (++addIndex == items.length) addIndex = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素 @SuppressWarnings("unchecked") public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[removeIndex]; if (++removeIndex == items.length) removeIndex = 0; --count; notFull.signal(); return (T) x; } finally { lock.unlock(); } } } ```
Condition的实现
`CondiitonObject`是同同步器`AbstractQueuedSynchronizer`的内部类 ,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也会是比较合理的,每个`Condition`对象都包含着一个队列(等待队列),该队列时`Condition`对象实现`等待/通知`功能的 关键。
等待队列
等待队列时一个`FIFO`的队列,在队列 的每个节点都包含一个线程引用,该 线程就是在`Condition`对象等待的线程,如果一个线程调用了`Condition.await()`方法,那么该线程将会 释放锁、构造节点加入等待队列进入 等待 状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的经静态内部类`AbstractQueuedSynchronizer.Node` 一个` Condition`包含一个等待 队列`Condition`拥有首节点(`fristWaiter`)和尾节点(`lastWriter`)。当前线程调用`Condition.await()`方法,将会以 当前线程构造节点,并将节点从尾部加入等待队列。
等待
调用`Condition` 的`await()`方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,当 从`await()`方法返回时,当前线程一定获取了`Condition` 相关联的锁, 如果从 队列 (同步队列和等待队列)的角度看`await()`方法,当调用`await()`方法时,相当于同步队列的首节点(获取了锁的节点)移动到`Condition`的等待队列中。
通知
调用`Condition`的`signal()`方法,将唤醒在等待队列中等待时间最长的节点(`首节点`),在唤醒节点之前,会将节点移动到`同步队列`中。
LockSupport
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作,LockSupport为构建同步组件的基础工具。 LockSupport定义了一组 以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。 Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开
其他
ThreadLocal
set(T):s设置一个值
Fork/Join
`Fork/Join`框架是`Java7`提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 - Fork:就是把一个大任务切分成若干个子任务并行的执行 - Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。 
Fork/Join的设计
分割任务
首先我们需要一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
执行任务合并结果
分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Disruptor并发框架
线程通信
共享内存
线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信
消息传递
线程之间必须通过发送消息来显示进行通信
并发集合
ConcurrentHashMap
ConcurrentHashMap是线程安全且高效的hahsMap.
什么是HashMap
高并发下的HashMap
因为在多线程环境下,使用`HashMap`进行`pu`t操作会引起死循环,导致`CPU`利用率接近100%,所以在并发情况下不能使用`HashMap`.
什么是ConcurrentHashMap
ConcurrentHashMap的结构
ConcurrentHashMap初始化
`ConcurrentHashMap`初始化方法是通过`initialCapacity`、`loadFactor`和`concurrencyLevel`等几个 参数来初始化`segment`数组、段偏移量`segmentShift`、段掩码`segmentMas`和每个`segment`里的 `HashEntry`数组来实现的。
ConcurrentHashMap允许多个读并发进行?
`ConcurrentHashMap`完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如`HashMap`中的实现,如果允许可以在`hash`链的中间添加或删除元素,读操作不加锁将得到不一致的数据。`ConcurrentHashMap`实现技术是保证`HashEntry`几乎是不可变的。`HashEntry`代表每个`hash`链中的一个节点,其结构如下所示: ``` static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; } ``` 可以看到除了`value`不是`final`的,其它值都是`final`的,这意味着不能从`hash`链的中间或尾部添加或删除节点,因为这需要修改`next` 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将`value`设置成`volatile`,这避免了加锁。
ConcurrentHashMap的size操作
如果我们要统计整个`ConcurrentHashMap`里元素的大小,就必须统计所有`Segment`里元素的大小后求和。`Segment`里的全局变量`count`是一个`volatile`变量,那么在多线程场景下,我们是不是直接把所有`Segment`的`count`相加就可以得到整个`ConcurrentHashMap`大小了呢?不是的,虽然相加时可以获取每个`Segment`的`count`的最新值,但是拿到之后可能累加前使用的`count`发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计`size`的时候把所有`Segment`的put,remove和clean方法全部锁住,但是这种做法显然非常低效。 因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以`ConcurrentHashMap`的做法是先尝试`2`次通过不锁住`Segment`的方式来统计各个`Segment`大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。 那么`ConcurrentHashMap`是如何判断在统计的时候容器是否发生了变化呢?使用`modCoun`t变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
ConcurrentHashMap get操作
`Segment`的`get`操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到`Segment`,再通过散列算法定位到元素,代码如下。 ``` public V get(Object key) { int hash =hash(key.hashCode()); return segmentFor(hash).get(key, hash); } ``` `get`操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读。 > 那么ConcurrentHashMap的get操作是如何做到不加锁的呢? 原因是它的`get`方法里将要使用的共享变量都定义成`volatile`类型,如用于统计当前`Segement`大小的`count`字段和用于存储值的`HashEntry`的`value`。定义成`volatile`的变量,能够在线 程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写 (有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量`count`和`value`,所以可以不用加锁。之所以不会读到过期的值,是因为根据Java内存模型的`happen before`原则,对`volatile字段的写入操作先于读操作,即使两个线程同时修改和获取`volatile`变量,`get`操作也能拿到最新的值,这是用`volatile`替换锁的经典应用场景。 ``` transient volatile int count; volatile V value; ```
ConcurrentHashMap put操作
由于`put`方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。`put`方法首先定位到`Segment`然后在Segment里进行插入操作。插入操作需要经历两个 步骤: 第一步判断是否需要对`Segment`里的`HashEntry`数组进行扩容, 第二步定位添加元素的位置,然后将其放在`HashEntry`数组里
ConcurrentLinkedQueue
`ConcurrentLinkedQueue`是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元 素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在 Michael&Scott算法上进行了一些修改
如何实现一个线程安全的队列?
1.使用阻塞算法
用一个锁(入队和出队用同一把锁)或两把锁(入队和出队用不同的锁)等范式来 实现
2.使用非阻塞算法
非阻塞的实现方式则 可以使用循环`CAS`的方式来实现
ConcurrentLinkedQueue
 `ConcurrentLinkedQueue`由`head`节点和`tail`节点组成,每个节点(`Node`)由节点元素(`item`)和 指向下一个节点(`next`)的引用组成,节点与节点之间就是通过这next关联起来,从而组成一张链表结构的队列。默认情况下`head`节点存储的元素为空,`tail`节点等于head节点。 ``` private transient volatile Node<E> tail = head; ```
入队列
入队列就是将入队节点添加到队列的尾部
出队列
java中的阻塞队列
阻塞队列(`BlockingQueue`)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除操作。 - **支持阻塞的插入方法**:意思是当队列满时,队列 会阻塞插入 元素的线程,直到队列不满。 - **支持阻塞 的移除方法**:意思是在队列为空时,获取元素的线程会等待队列变为非空。  - 抛出异常: 当队列满时 ,如果再往队列 里插入元素,会抛出IllegalStateException("Queue full")。当队列空时,从队列里获取元素会抛出throw new NoSuchElementException(); - 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。 - 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。 - 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
ArrayBlockingQueue
是一个用数组实现的有界阻塞 队列,按照先进先出(`FIFO`)的原则对元素进行排序。 ``` ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true); ``` 访问者的公平性是使用可重入锁实现的,代码如下。 ``` public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } ```
LinkedBlockingQueue
用链表实现的有界阻塞队列。此队列的默认和最大长度为 `Integer.MAX_VALUE`.此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue
`PriorityBlockingQueue`是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现`compareTo()`方法来指定元素排序规则,或者初始化 `PriorityBlockingQueue`时,指定构造参数`Comparator`来对元素进行排序。需要注意的是不能保证 同优先级元素的顺序。
DelayQueue
`DelayQueue`是一个支持延时获取元素的无界阻塞队列。队列使用`PriorityQueue`来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素
使用场景
缓存系统的设计
可以用`DelayQueue`保存缓存元素的有效期,使用一个线程循环查询`DelayQueue`,一旦能从`DelayQueue`中获取元素,表示缓存有效期到了。
定时任务调度
使用DelayQueue保存当前将会执行的任务和执行时间,一旦从`DelayQueue`中取到任务就开始执行,比如`TimerQueue`就是使用`DelayQueue`实现的。
SynchronousQueue
`SynchronousQueue`是一个不存储元素的阻塞队列。每一个`put`操作必须等待一个`take`操作,否则不能继续添加元素。 `SynchronousQueue`可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。`SynchronousQueu`e的吞吐量高于`LinkedBlockingQueue`和`ArrayBlockingQueue`。
LinkedTransferQueue
`LinkedTransferQueue`是一个由链表结构组成的无界阻塞`TransferQueue`队列。相对于其他阻 塞队列,`LinkedTransferQueue`多了`tryTransfer`和`transfer`方法。
LinkedBlockingDeque
`LinkedBlockingDeque`是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队 时,也就减少了一半的竞争
CopyOnWrite
介绍:Copy-On-Write 简称 COW,其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容 Copy 出去形成一个新的内容然后再改,这是一种延时懒惰策略
CopyOnWriteArrayList
CopyOnWriteArraySet
CopyOnWrite 容器介绍
`CopyOnWrite` 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy`,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我
使用场景
黑白名单
读多写少
优缺点
占内存(写时复制new两个对象)、不能保证数据实时一致性
线程池

ThreadPoolExecutor
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
构造参数
RejectedExecutionHandler
`RejectedExecutionHandler`(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolic
AbortPolicy
直接抛出异常。
CallerRunsPolicy
只用调用者所在线程来运行任务
DiscardOldestPolicy
丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy
不处理,丢弃掉
corePoolSize
`corePoolSize`(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。
maximumPoolSize
`maximumPoolSize`(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
keepAliveTime
keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
TimeUnit
TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒 (NANOSECONDS,千分之一微秒)。
runnableTaskQueue
`runnableTaskQueue`(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。
ArrayBlockingQueue
是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。
LinkedBlockingQueue
一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通 常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue
一个具有优先级的无限阻塞队列。
ThreadFactory
`ThreadFactory`:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。 ``` new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build(); ```
种类
FixedThreadPool
创建使用固定线程数FixedThreadPool的API
SingleThreadExecutor
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
CachedThreadPool
CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任的小程序,或者是负载较轻的服务器。
Callable和Future
ScheduledExecutorService
合理配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。 - 任务的性质:CPU密集型任务、IO密集型任务和混合型任务 - 任务的优先级:高中低 - 任务的执行时间:长中短 - 任务 的依赖性;是否依赖其他系统资源 CPU密集型任务应配置尽可能小的线程,如配置NCPU+1个线程的线程池, IO密集型任务线程并不是一直在执行任务 ,则应配置尽可能多的线程,如2*NCPU 。 可以通过`Runtime.getRuntime().availableProcessors()`方法获得当前设备的CPU个数 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行。
线程池的监控
在监控线程池的时候可以使用以下属性: - taskCount: 线程池需要执行的任务数量 - completedTaskCount: 线程池在 运行过程中已完成的任务数量,小于或等于taskCount。 - getPoolSize;线程池的线程数量,如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减 - getActiveCount; 获取活动的线程数
原子操作
基本类型
AtomicBoolean
AtomicInteger
`getAndIncrement`是如何实现原子操作的呢? ``` public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } ``` 源码中的for循环体的第一步先取得AtomicInteger里存储的数值 第二步对AtomicInteger的当前数值进行加1操作 第三步:调用compareAndSet方法来进行原子更新操作,该方法 先 检查当前数值是否等于current,等于意味着AtomicInteger的值没有被其他线程修改过,则将AtomicInteger的当前数值更新成next的值,如果不等compareAndSet方法会返回false,程序会进入for循环重新进行compareAndSet操作。
AtomicLong
数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
引用类型
原子更新基本类型的`AtomicInteger`,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。 ``` public class AtomicReferenceTest { public static AtomicReference<user> atomicUserRef = new AtomicReference<user>(); public static void main(String[] args) { User user = new User("conan", 15); atomicUserRef.set(user); User updateUser = new User("Shinichi", 17); atomicUserRef.compareAndSet(user, updateUser); System.out.println(atomicUserRef.get().getName()); System.out.println(atomicUserRef.get().getOld()); } static class User { private String name; private int old; public User(String name, int old) { this.name = name; this.old = old; } public String getName() { return name; } public int getOld() { return old; } } } ```
AtomicReference
AtomicReferenceArrayFieldUpdater
原子更新字段类
AtomicIntegerFieldUpdater
原子更新整型的字段的更新器。
AtomicLongFieldUpdater
原子更新长整型字段的更新器
AtomicStampedReference
原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。 ``` public class AtomicIntegerFieldUpdaterTest { // 创建原子更新器,并设置需要更新的对象类和对象的属性 private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater. newUpdater(User.class, "old"); public static void main(String[] args) { // 设置柯南的年龄是10岁 User conan = new User("conan", 10); // 柯南长了一岁,但是仍然会输出旧的年龄 System.out.println(a.getAndIncrement(conan)); // 输出柯南现在的年龄 System.out.println(a.get(conan)); } public static class User { private String name; public volatile int old; public User(String name, int old) { this.name = name; this.old = old; } public String getName() { return name; } public int getOld() { return old; } } ```
java如何实现原子操作
使用循环CAS实现原子操作
锁
锁机制保证了只有获得锁的线程才能够操作锁定的内存区域。`JVM`内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁。有意思的是除了偏向锁,`JVM`实现锁的方式都用了循环 `CAS`,即当一个线程想进入同步块的时候使用循环`CAS`的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁
内存模型
java 线程之间的通信由java内存模型(JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间 的共享变量存储在主内存中,每隔线程都有一个私有的本地内存,本地内存存储了该线程以读/写共享 变量的副本。
重排序
重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。
数据依赖性
如果两个操作访问统一变量,且这两个操作有一个是写操作,此时这两个操作之间就存在数据依赖性,
写一个变量之后,再读这个变量
写一个变量之后,再写 这个变量
读一个变量之后,再写 这个变量
重排序对多线程的影响
顺序一致性
数据竞争与顺序一致性
> JMM对正确同步的多线程程序的内存一致性做了如下保证。 如果程序时正确同步的,程序的执行将具有顺序一致性(Sequentially Consistent)--即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。这里的同步是指广义上的同步,包括对常用同步不原语(synchronized、volatile和final)的正确使用。
顺序一致性内存模型
> 顺序一致性内存模型有两大特性: - 一个线程中的所有操作必须按照程序的顺序来执行 - (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每隔操作必须原子执行且立刻对都有的线程可见。
同步程序的顺序一致性效果
未同步程序的执行特性
> 未同步程序 在JMM模型和顺序一致性模型有一下几个差异: - 顺序一致性模型保证 单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行。 - 顺序一致性模型保证所有线程只能看到一致的操作执行孙旭,而JMM不保证 所有线程看到一致的操作执行顺序。 - JMM不保证对64位的long型和double型的写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作具有与原子性
happens-beofre
定义
- 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行熟悉怒排在第二个操作之前。 - 两个操作之间存在happens-before关系,并不意味java平台的具体实现必须按照happens-before关系指定的顺序来执行。如果重排序后的执行结构,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(JMM允许这种重排序) `as-if-serial`语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结构不被改变。
规则
1、程序次序规则:在一个单独的线程中,按照程序代码的执行流顺序,(时间上)先执行的操作happen—before(时间上)后执行的操作。
2、管理锁定规则:一个unlock操作happen—before后面(时间上的先后顺序,下同)对同一个锁的lock操作。
3、volatile变量规则:对一个volatile变量的写操作happen—before后面对该变量的读操作
4、线程启动规则:Thread对象的start()方法happen—before此线程的每一个动作。</span>
5、线程终止规则:线程的所有操作都happen—before对此线程的终止检测,可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。
6、线程中断规则:对线程interrupt()方法的调用happen—before发生于被中断线程的代码检测到中断时事件的发生。
7、对象终结规则:一个对象的初始化完成(构造函数执行结束)happen—before它的finalize()方法的开始。
8、传递性:如果操作A happen—before操作B操作B happen—before操作C,那么可以得出happen—before操作C。
as-if-serial
- 不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结构不能被改变。 - 为了遵守as-if-serial语义,编译器和处理器不会存在数据依赖关系的操作 做重排序,但是如果操作之前不存在数据依赖关系,这些操作就可能被编译器和处理器重排序。 以下面例子为例:
JMM的内存可见性保证
- 单线程程序:单线程程序不会出现内存可见性问题。编译器、runtime和处理器会共同确保单线程程序的 执行结果与该程序在顺序一致性模型中的执行结果相同。 - 正确同步的多线程程序。正确同步的多线程程序的执行顺序一致性。这是JMM关注的重点,JMM通过限制编译器和处理器的重排序为程序员土工内存可见性保证。 - 未同步/未正确同步的多线程程序。 JMM为他们提供了最小安全保障
synchronized
synchronized原理
- 对于普通同步方法,锁是当前实例对象 - 对于静态同步方法,锁是当前类Class对象 - 对于同步方法块,锁是Synchronized括号里配置的对象 `JVM`基于进入和退出`Monitor`对象来显示方法同步和代码块同步,但两者的实现细节不一样。代码块同步是使用`monitorenter`和`monitorexit`指令实现的,而方法同步是使用另外一种方式实现的。 `monitorenter`指令是在编译后插入到同步代码块的开始位置,而`monitorexit`是插入到方法的结束处和异常处,JVM保证每个monitorenter必须有对应的monitorexit与之配对。