导图社区 并发编程
这是一篇关于并发编程的思维导图,在并发编程中,程序可以同时执行多个任务,从而提高程序的执行效率和响应速度。
编辑于2023-12-21 17:34:27并发编程
简介
三要素
原子性
不可分割,要么操作全部,要么失败
有序性
按照代码先后顺序执行
可见性
多个线程访问一个变量,其中一个修改,其他线程能立即获取到最新
线程五大状态
创建
new 新线程
就绪
调用 start 方法,不一定run,等CPU调
运行
CPU调了
阻塞
Sleep
死亡
执行完,或者异常
乐观与悲观锁
乐观
每次先执行
悲观
先加锁,加不到就阻塞
线程协作
wait
notify
notifyAll
synchronized
修饰类
修饰代码块
修饰方法
修饰静态方法
CAS
对比修改前后的两个数,会出现ABA问题
JDK11 就交 Compare And Set 之前是 Swap
线程池
提高性能
多线程
1、基础理论
特征
1、无论并发与否,启动必定创建 main 唯一线程
2、java 中线程共享程序所有资源,内存、打开文件
3、线程优先级
1、Thread.MIN_PRIORITY (1)
2、Thread.NORM_PROTIRY(5)默认5
3、Thread.MAX_PRIORITY (10)1-10之间,10优先级最高
4、可以创建 守护线程(daemon) 和 非守护线程
状态
NEW:Thread已经创建,但还没开始执行
RUNNABLE:正在运行
BLOCKED:正在等待锁定
WAITING:等待另一个线程动作
TIME_WAITING:等待另一个线程动作,但是有时间限制
方式
extends Thread
implements Runnable
MyCallable myCallable = new MyCallable(); FutureTask futureTask = new FutureTask(myCallable); // 启动线程,执行callable的业务 new Thread(futureTask).start(); // 同步等待callable的返回值 String result = futureTask.get();
可以返回内容
通信
共享内存
对共享内存区域读写实现通信
信号量
计数器,访问前获取信号量,访问后释放信号量
互斥锁
一个时间只能一个线程访问共享资源
条件变量
等待通知机制,不满足条件等待,满足才执行
管道
基于内存的通信,可具有父子进程间通信,或不同进程内不同线程间通信
消息队列
进程间通信机制,生产-消费模式
Thread
获取 Thread 对象信息
getId() 返回对象标识符,正整数,唯一且无法改变
getName、SetName,允许你获取和设置名称
getPriority、setPriority 获取和设置优先级
isDaemon、setDaemon 获取和建立守护条件
getState() 获取对象状态
interrupt() 中断目标线程
interrupted() 判断线程是否被中断 且清除
isInterrupted() 判断是否被中断,不清除
sleep(long ms) 暂停ms时间
join() 只有等这个线程执行完,才会继续主线程
在使用Thread.join()方法时,可以通过反复调用join方法来体现它的作用。具体来说,在主线程中创建多个子线程并启动,然后对这些子线程分别调用join方法,主线程就会等待每个子线程依次执行完毕后再继续执行下去。
yield() 暂停线程,让CPU分配给其他线程
setUncaughtExceptionHandler() 设异常处理器
currentThead() 返回Thread对象
synchronized
轻量阻塞 & 重量级阻塞
Interrupted
如何判断会抛出此异常,在调用 Interrupted 中断线程后,线程中会不会存在声明抛出 IntteruptedExcption 函数,如果有,则会抛出。 好处:取消线程的中断状态!
作用:中断线程 只对轻量级起作用
boolean interrupted = isInterrupted(); System.out.println("中断状态:" + interrupted);
何抛出此异常?
抛出,线程方法中抛出异常时终止
子线程代码中包含此代码块,其中抛出了 InterruptedExcption ,改变了线程状态!则抛出 try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
否则,不抛出
isInterrupted() 返回boolean当前中断状态
daemon
当线程设置为 daemon 线程后,决定线程是否关闭的条件是非守护线程是否关闭
关闭
thread.stop() 已经过时
设置变量,每循环运行通过变量判断,控制变量进行关闭
2、并发核心概念
并发与并行
并发:CPU切片迅速切换
并行:一时间,多个任务正在执行
线程切换就是并发,不切换,并行
同步
不可变对象
String
原子操作与原子变量
共享内存消息传递
消息传递(同步、异步)
3、并发问题
死锁
互斥
占有并等待资源
不可剥夺
循环等待
活锁
资源不足
优先权反转
4、JMM内存模型 指令重排
1、重排序内存可见性
编辑器重排
CPU指令重排
CPU内存重排
内存屏障
happen-before 内存可见性
volatile
功能
64位写入的原子性
内存可见性
禁止直连重排
JUC
并发容器
BlockingQueue 单链
方法
remove() 非阻塞
take() 阻塞
poll() 阻塞
ArrayBlockingQueue 解析 数组实现的环形队列
属性
Object[] items 容器
int takeIndex 队头指针
int putIndex 队尾指针
int count 长度
ReentrantLock lock 锁
Condition notEmpty 非空
Condition notFull 非满
方法
put
队列满则阻塞
put成功,通知非空条件
take
队列为空
take成功,通知非满条件
LinkedBlockingQueue 解析 单向链表的阻塞队列
属性
int capactiy 最大容量
count = new AtomicInteger(0) 原子变量
Node<E> head 头
Node<E> last 尾
两把锁,两个条件
构造器
指定容量,不指定Integer.MAX_VALUE
差异
1、提高并发度,2把锁控制头、尾,一个时间只能放一个、取一个,但是可以同时放和取。对于cont变量,则也是原子类
2、各自一把锁,需要调用对方 condition 和 signal 通知消费者、生产者
PriorityBlockQueue 解析 按优先级出队列
属性
Object[] qieie 容器
int size 大小
ReentrantLock lock; 锁
Condition notEmpty; 非满
构造器
不指定默认11长度
特点
没有满的情况
插入不阻塞,可以扩容
DelayQueue 解析 延迟队列,谁时间小谁先出
属性
lock = new ReentrantLock(); 锁
available = lock.newCondition();非空条件
PriorityQueue<E> q 优先级队列
SynchronousQueue 解析 特殊的 BlockingQueue 没有容量
构造器(boolean fair)
传入 true 公平 FIFO
new TransferQueue<E>()
传入 false 非公平
TransferStack<E>();
方法
transfer(E e,boolean timed,long nanos) e 为 null 取,非null 插值
特点
假设3个线程put,3个线程都会阻塞,直到其他线程调用3次 take
BlockingDeque 双端队列
方法
putFirst
Lputast
takeFirst
takeLast
抛出 InterruptedExcpetion
继承
BlockingQueue
Deque
LinkedBlockingDeque 解析 双向链表
属性
final int capacity 固定容量
int count 长度
class Node<E> 双向链表
Node<E> first 头 Node<E> last 尾
一把锁两个条件
new ReentrantLock();
notEmpty = lock.netCondition();
notFull = lock.newCondition();
写、读 互斥
CopyOnWrite 复制写
CopyOnWriteArrayList 解析 底层数组
方法
所有读方法都没有锁
add() 代码快里面有 synchronized 拷贝一份,新增进去
CopyOnWriteArraySet 解析 底层 CopyOnWriteArrayList (套娃)
如何元素有,就不添加
ConcurrentLinkedQueue/Deque 基于CAS 阻塞队列
ConcurrentHashMap 并发Map
init:初始长度你可以算,但不一定精确你给的值,他会算,保持2的整数次方 计算过程中,如果没有计算到正确的值,就 Thread.yield 自旋
put:4个分支
1、对每个桶进行初始化
2、对每个桶的元素初始化
3、扩容
数组长度大于 64 时,才会考虑把链表转为红黑树,否则直接扩容
先建一个2倍的新HashMap
问题
1、迁移过程中,写入到旧 hashMap 时,会创建一个 ForwardingNode 转发节点,确保不会引用失败
4、放入元素
1、加锁
判断链表还是红黑树,分别对其增加元素
最后如果是链表,当超过 bigCount 8 时,变为红黑树
ConcurrentSkipListMap/Set 有序key的HashMap
三层链表,有点类似于二分法,跳表查询,因为是有序的,所以可以定位到区间范围。
再操作之前还需要判断是否删除,因为可以能有其他线程也在操作。
同步工具类
Semaphore 信号量 基于 AQS
构造器 new Semaphore(5, false); 线程数、是否公平(公平效率高)
acquire() 抢锁 抢不到截停
release(int) 还锁,不传默认1
CountDownLatch 计数器 基于AQS
构造器 new CountDownLatch(5); 线程数
countDown() 计数器减一,小于0时,也可以减
await() 当=0时候通过,不到0不中断
CyclicBarrier 同步器
构造器 new CyclicBarrier(5, () -> { }); 等所有线程结束才会执行 void内方法
await() 结束线程 底层重入锁,判断是否全部完成,触发 void方法
Exchanger 线程交换数据
如果没有其他线程调用exchange,线程阻塞,直到有其他线程调用exchange为止。
exchanger.exchange("交换数据1")
Phaser 代替 CountDownLatch
分阶段进行同步 可以使多个线程到一个节点时,等待其他线程完成。再继续往下一个节点走
arrive()
awaitAdanc()
新特性:1、动态调整线程个数
register() 注册
bulkRegister(int) 注册多个
arriveAndDeregister() 解除注册
新特性:2、层次 Phaser
注册子节点
没有基于 AQS,但具备 AQS 特性
state 变量
CAS 操作
阻塞队列
Atomic
AtomicInteger
AtomicLong
AtomicBoolean
转成 Integer 再操作
AtomicBoolean
转成 Long 再操作
AtomicReference
底层+、- 都没用到 synchronized,而是 U.getAndAddInt ,CAS操作,先查再修改且自旋
解决 CAS的ABA问题
AtomicStampedReference 加版本、Integer
AtomicMarkableReference 加标记、boolean
Lock 和 Condition
Lock 是一个接口定义如下 void lock() 获取锁 常用 void lockInterruptibly() throws InterruptedExcpetion; 获取锁,是阻塞的,被其他线程中断 boolean tryLock(); 能获取到就获取,获取不到就返回执行其他工作 boolean tryLock(long tme,TimeUnit unit) throws... 给定时间获取锁。 void unlock(); 释放锁 常用 Condition newCondition();
互斥锁
锁的可重入性
当一个线程获取到锁后,进入临界区内,再次调用获取锁,仍然还可以获取到,就是可重入锁
类继承层次
锁的公平线和非公平性
公平排队,优点雨露均沾
不公平靠性能抢,优点减少线程切换
公平与非公平 Lock 差异
基于 AQS 分析 ReentrantLock
不公平,不排队,state ==0,直接将线程设置为排他线程,同时设置 state ,!= 0时,但是排他线程是当前线程,设置 state,否则返回 flase 获取失败
公平,state ==0 ,且队列没有等待线程,设置当前线程为排他线程,设置 state 的值,如果排他线程就是他自己,设置 state值,否则返回false 获取失败
阻塞队列与唤醒机制
底层调用 park() 方法,把自己阻塞,其他线程调用 unpark() 唤醒
解锁分析
解锁不管他公平还不公平
1、判断排他锁是不是自己线程,如果不是抛异常,因为只有获取到锁的才能释放
2、再设置 state,这里没有 CAS,因为是单线程的
3、最后 unpark(下一个节点)唤醒下个节点线程
lockInterruptibly()
不能被中断,如果判断被其他线程中断,直接抛异常
tryLock()
基于非公平锁的 tryAcquire(),对 state CAS操作,如果拿到锁就执行临界区,拿不到返回 false 也不阻塞
读写锁
和互斥锁相比,读写锁就是读,不互斥
继承层次
ReadWriteLock 接口
Lock readLock()
Lock writeLock()
读写锁基本实现
写是互斥,读是不互斥的
ReadLock()
如果写锁进程占用 或 当前线程不是排他线程 抢锁失败
如果锁值到极限 异常
CAS 设置 state 值
如果 state=0 当前线程是第一个读线程
如果写线程是当前线程 重入读锁
如果不是当前线程,则从 ThreadLock 中获取当前线程的读锁个数,并设置当前线程所持有的读锁个数
Condition
功能和 wati notify 类似
我们知道,wait notify 必须和 synchronized 一起使用,因此 Condition 也必须和 Lock 一起使用
属性
await 线程挂起 直到接受信号或被中断
awaitUnintterruptibly() await一样,但是不会响应中断请求
signal 唤醒一个正在等待的 condition 线程
ginalAll 唤醒所有正在等待的 condition 线程
awaitNanos(long time) 挂起,直到超时
await(long time,TimeUnit unit) 指定时间参数 挂起
await() 方法
1、线程调用 await 时候,肯定是先拿到锁的,所以执行 addConditionWaiter 内时不需要执行 CAS,线程天生安全
2、线程执行 wait 操作前,必须先释放锁,否则发生死锁,这个和 wait、nodity 与 synchronized 配合机制一样
3、线程从 wait 唤醒后,必须用acquireQueued 重新拿锁
4、检测 park 期间是否收到中断信号,
其他线程 unpark
收到线程中断
StampedLock 1.8 引入
区别
ReentrantLock 读读互斥、读写互斥、写写互斥
ReentrantReadWriteLock 读读不互斥、读写互斥、写写互斥
StampedLock 读读不互斥、读写不互斥、写写互斥
线程池与Future
线程池
实现原理
考虑问题
1、队列多长,如果无界,导致内存耗尽,如果有界满了咋办
2、线程池中线程是固定还是动态变化
3、每次提交任务,是入队列还是开线程
4、没有任务时,线程睡眠?还是阻塞?如果阻塞,怎么唤醒
解决办法
1、不用阻塞,所以不考虑阻塞唤醒,队列为空就睡眠,醒了就看队列有没有任务,不断循环
2、不使用阻塞队列,但是在队列外部实现阻塞唤醒
3、使用阻塞队列
继承体系
ThreadPoolExecutor
核心变量
1、AtomicInterger 线程池状态、线程个数 2个值
总共32位,高3位代表存储状态,后29位线程个数
状态
RUNNING -1
SHUTDOWN 0
STOP 1
TIDYING 2
TERMINATED 3
2、BlockingQueue 存放任务阻塞队列
3、ReentratLock 可重入锁
4、HashSet<Worker> 一个worker代表一个线程,Worker 继承AQS,本身就是锁
构造参数
1、corePoolSize 始终维护的线程数
2、maxPoolSize 在 corePoolSize 和 队列满的情况下,扩充线程到此值,可以说等待线程
3、keepAliveTime maxPoolSize中等待销毁时长
4、blockingQUeue 线程池用到的队列类型
5、threadFactory 线程工厂
6、RejectedExceptionHandler 如果满了,拒绝策略
4种拒绝策略
defaultHandler 默认 AbortPolicy() 抛异常
CallerRunspolicy 线程池放不下,自己干
DiscardPolicy 神不知鬼不觉 自己丢掉
DiscardOldestPolicy 删掉最早的任务
优雅关闭线程
方法
exectuor.shutdown() 中短空闲线程,不清除队列
exectuor.shudtownNow() 全干死
任务提交过程
方法
execute(Runable)
还有空位就执行,没有空位放队列,然后判断一下队列里有没有空位,有就执行,最后就放 maxPoolSize 里,直到放不下就拒绝策略
启动新线程 addWorked(Runable,boolean)
boolean 为 true corePoolSize 为上限
为 false maxPoolSize 为上限
Executors 工具类
newSingleThreadExecutor 单线程池
newFixedThreadPool(int) 固定大的线程池
newCachedThreadPool 接收一个请求,创建一个线程
ScheduledExecutorService 有调度的单线程池
newSchedulerdThreadPool(int) 多线程有调度
概要
阿里巴巴手册不让这样玩,避免风险
ScheduledTrheadPoolExecutor
实现了按时间调度来执行任务
两大类
延迟执行任务
schedul(....)
周期执行任务
scheduledAtFixedRate(...)
固定频率,跟你任务执行多长时间没关系
scheduedWithFixedDelay(...)
按时间间隔,下一次执行时间 取决于任务执行多长+间隔时间
原理 继承了 ThreadPoolExecutor,依赖 DelayQueue,这是 BlockingQUeue 的一种,二叉堆
CompletableFuture 异步编程⼯具
功能:将任务交给线程工作,Main线程 get() 阻塞 等待工作线程完成,再输出工作线程的内容 作用:工作线程和Main线程交互
在调用 get() 前后,打印 future,会有。Not competed 和 Completed norally 区别
runAsync、supplyAsnc
runAsync 异步给 futrue 指派任务,无返回值
supplyAsync 异步阻塞,有返回值
thenRun、thenAccept、thenApply
future.thenRun(new Runnable() 紧接着干下一件事情 拿不到返回值
tthenAccept(new Consumer<String>() 可以拿到返回值 String 类型
thenApply(new Function<String, Integer>() 第一个接受参数,第二个返回参数
四种原型
Runable - runAsync、thenRun
Consumer - thenAccept
Supplier - supplierAsync
Fuction - thenApply
thenCompose、thenCombine
在上面thenApply多个套娃的情况下,一口气拿到值
allOf、anyOf
allOf 全部完成,再返回
anyOf 只有要一个返回,则返回
内部原理 基于 ForkJoinPool
RecurisiveTask 有返回值
RecurisveAction 无返回值
核心数据结构
环形队列
线程池状态 ctl
分成5个部分,每个部分的值表示状态
阻塞栈 Trebier Stack 实现多个队列阻塞唤醒
ctl 变量初始化,通过5个部分状态判断
Worker 阻塞唤醒机制
阻塞-进栈
唤醒-出栈
任务提交
内部提交push
外部提交
工作窃取算法 乐观锁
join、fork
fork 放任务
如果是工作线程 放入局部队列
客户端线程 放入公共队列
join 嵌套阻塞
怎么唤醒? 循环判断状态调用,当前线程允许完了,要通知其他线程
优雅关闭
线程允许中会判断线程池状态
方法
图形化:任务呈网状形,有向无环
多线程设计模式
Single Thread Execution 模式
概念
线程
临界区
何时使用
1、多线程时
2、状态可能发生变化
3、需要确保安全性
Immutable 模式
没有共享资源,不需要锁
Guarded suspension 模式
请求 等待、等待返回成功前阻塞
Balking 模式
停止并返回,这里不阻塞,允许不了就直接停了
也可以等待一段时间,超出等待时间直接返回
通过异常通知超时
Futrue.get
Exchanger.exchange
Cyclicarrier.await
CountDownLatch.await
通过返回值通知超时
BlockingQueue.poll 返回 null
Semaphore.tryAcquire 为 false 时
Lock.tryLock 为false时
Producer-Consumer 模式
生产者消费者模式 如果都只有一个,我们也可以称为管道 Pipe模式
Read-Write Lock 模式
采用了 Guarded suspension 模式
要点
利用读写操作线程之间不会冲突提高程序性能
适合读取操作负载较大的情况
适合少写多读
Juc 中
ReentrantReadWriteLock、ReadWriteLock
Thread-Per-Message 模式
将工作内容交给线程,主线程返回
Worker Thread 模式
线程池
优点
提高吞吐量
控制容量
调用执行分离
多态Request角色
Futrue 模式
延时等待获取结果
锁的基本原理
1、需要state变量,标记锁的状态,至少2个值
当 state >1 时表示锁支持可重入
当 state = 0 表示没有线程持有锁
当 state = 1 有一个锁
2、需要记录当前是哪个线程持有锁
3、需要底层支持对一个线程进行阻塞、唤醒
LockSupport 工具类
void park()
当前线程调用 park 线程阻塞
void unpark()
其他线程调用 unpark(Thread) 唤醒阻塞线程
4、需要队列维护所有阻塞线程,这个队列必须是线程安全的无锁队列,也需要使用CAS
阻塞队列是 AQS 的核心
底层通过 一个双向链表 和两个指针实现
有返回值
无返回值
有参
无参