导图社区 极客时间:Java并发编程实战
这是一篇关于Java并发编程实战的思维导图。包括学习攻略并发编程bug源头,如何解决可见性,有序性,等待通知,优化循环,安全活跃,性能等知识点,感兴趣的小伙伴可以下载收藏哟。
编辑于2022-11-24 10:01:54 浙江省Java并发编程实战
学习攻略
跳出来 看全景
并发分为三个核心问题
分工
类比为 菜只能一个个的做 对应生产者
但是服务员可以一下上很多菜 对比消费者
同步
指线程间的协作 一个线程执行完了任务 通知另一个线程开工
还可以是 当一个条件不满足 线程等待 条件满足时 线程被唤醒
互斥
为了保证线程的安全性存在的
同一时刻只有一个线程允许访问共享变量
全景图

全景图2

钻进去 看本质
解决某个问题 一定有背后的理论作为支撑
精选评论

相关书籍
《Java并发编程实战》作者阵容可谓大师云集,也包括Doug Lea 《Java并发编程的艺术》讲解并发包内部实现原理,能读明白,内功大增 《图解Java多线程设计模式》并发编程设计模式方面的经典书籍 《操作系统:精髓与设计原理》经典操作系统教材 http://ifeve.com 国内专业并发编程网站 http://www.cs.umd.edu/~pugh/java/memoryModel/ 很多并发编程的早期资料都在这里
并发编程bug源头
并发程序幕后的故事
核心矛盾 速度:cpu>内存>io
所以单单提高cpu的性能是无法提高整体的速度的 为此 人们做出了很多优化
CPU 增加了缓存,以均衡与内存的速度差异;
操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;
编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
但是这样也带来了其他问题
缓存导致的可见性问题
单核时代 不会出现什么可见性问题

可见性
一个线程修改共享变量 另一个线程也能看到
图示

两个线程同时执行1w次的+1操作
结果接近1w 但永远不会到2w
代码
public class Test { private static long count = 0; private void add10K() { int idx = 0; while (idx++ < 10000) { count += 1; } } public static void main(String[] args) throws InterruptedException { System.out.println(calc()); } public static long calc() throws InterruptedException { final Test test = new Test(); // 创建两个线程,执行add()操作 Thread th1 = new Thread(() -> { test.add10K(); }); Thread th2 = new Thread(() -> { test.add10K(); }); // 启动两个线程 th1.start(); th2.start(); // 等待两个线程执行结束 th1.join(); th2.join(); return count; } }
原因在于修改后的变量还没有被同步进内存 而又从内存中取值了
线程切换带来的原子性问题
如上面的count+=1
实际上对应三条cpu指令
将内存中的值取出放入到cpu寄存器
执行+1操作
将对应的值刷新回内存
看似很正确 但是在多线程的情况下 这三条指令并不是原子性操作
这就会造成线程1执行到一半的时候线程2也来执行
图示

原子性
多个操作在cpu中不被中断
编译优化带来的有序性问题
编译器优化会将我们认为完美的程序执行顺序调换
图示

代码
public class Singleton { static volatile Singleton instance; static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; } public static void main(String[] args) { HashSet<String> set = new HashSet<>(); for (int i = 0; i < 10000; i++) { new Thread(() -> { set.add(Singleton.getInstance().hashCode() + ""); }).start(); } System.out.println(set); } }
红色部分不可缺少
我们以为的new一个对象
分配内存M
内存M上初始化Singlton对象
将M的内存地址赋值给instance变量
实际上
分配内存M
将M的内存地址赋值给instance变量
内存M上初始化Singlton对象
所以代码后面还需要判断下 instance是否为空 这就是双重校验
实际上加入volatile就多了写屏障 写之前的指令不会被重排 synchronized包裹不完全 所以会有重排序的问题
课后问题
常听人说,在 32 位的机器上对 long 型变量进行加减操作存在并发隐患,到底是不是这样呢?现在相信你一定能分析出来。
因为long类型占64位 那么就需要两个32位才能组成long 加减的时候需要多条指令组合 这样在多线程下就可能出现重排或原子性问题
如何解决可见性、有序性
什么是java内存模型
导致可见性、有序性问题的原因在于
缓存 编译优化
那么禁用不就好了?
导致性能下降
按需禁用才是明智之选
volatile
最原始的意义就是
禁用cpu缓存
我的理解是 认为缓存是不可靠的 每次读取数值都要从内存中去拿 自然能够保证数据的一致了
其实有两个作用
主存读保证可靠
读写屏障保证有序
案例
// 以下代码来源于【参考1】 class VolatileExample { int x = 0; volatile boolean v = false; public void writer() { x = 42; v = true; } public void reader() { if (v == true) { // 两个线程 执行完 writer 这里x会是多少呢? } } }
线程A写 线程B读
jdk5前可能是0 42, 5之后肯定是42了
因为5之前只保证了可见性 无法保证有序性
所以对volatile做了加强
使得程序遵照hb原则
Happens-Before 规则
前面一个操作对后一个的操作是可见的
还约束了编译期优化 虽允许优化 但是需要遵守它的规则
程序的顺序性规则
x对于v的操作是可见的 单线程思维理解 否则改了等于白改?
回过头来更正下 其实指的是 如果v为true了 那么前面的变量的写操作必定是已经发生了
还可以理解为 v为true 那么x必定为42
volatile 变量规则
对volatile变量的写操作先于读操作
也就是说写操作后所有线程都会感知到变量变化 重新从主内存中读取 底层用的就是内存读写屏障
传递性
a先于b b先于c 则a先于c
示例

12 步骤相当于 规则1
v为true x必定为42 因为v是volatile 加了些屏障
23 步骤相当于 规则2
v为true了 由于有读屏障 x读到的也是最新值
所以1到4 4能够直接看到修改
我的理解是volatile强制将cpu缓存的变量刷新到内存中
恍然大悟 再看之前的例子 因为先把x设置为了42 再把v设置为了true
而线程B读的时候如果为true 那么必定能够读到42
管程中锁的规则
synchronized就是java相对于管程的实现
互斥隐式加锁解锁都由编译器实现
示例代码
synchronized (this) { //此处自动加锁 // x是共享变量,初始值=10 if (this.x < 12) { this.x = 12; } } //此处自动解锁
线程1运行完 线程2能够立刻看到x=12
线程 start() 规则
在线程1执行start()前 主线程对变量的修改 线程1都是能够感知的 或者说执行前就知道了
示例代码
Thread B = new Thread(()->{ // 主线程调用B.start()之前 // 所有对共享变量的修改,此处皆可见 // 此例中,var==77 }); // 此处对共享变量var修改 var = 77; // 主线程启动子线程 B.start();
线程 join() 规则
示例代码
Thread B = new Thread(()->{ // 此处对共享变量var修改 var = 66; }); // 例如此处对共享变量修改, // 则这个修改结果对线程B可见 // 主线程启动子线程 B.start(); B.join() // 子线程所有对共享变量的修改 // 在主线程调用B.join()之后皆可见 // 此例中,var==66
关键在于join起了一个插队的作用 你插队买了个包子 店里(主线程)的包子少了一个 主线程能感知到吗 当然可以啊
被我们忽视的 final
对象逸出问题
// 以下代码来源于【参考1】 final int x; // 错误的构造函数 public FinalFieldExample() { x = 3; y = 4; // 此处就是讲this逸出, global.obj = this; }
另一个线程访问的时候 可能发现obj是空的情况
建议就是不要在构造中直接赋予外部变量this
总结
1. 为什么定义Java内存模型?现代计算机体系大部是采用的对称多处理器的体系架构。每个处理器均有独立的寄存器组和缓存,多个处理器可同时执行同一进程中的不同线程,这里称为处理器的乱序执行。在Java中,不同的线程可能访问同一个共享或共享变量。如果任由编译器或处理器对这些访问进行优化的话,很有可能出现无法想象的问题,这里称为编译器的重排序。除了处理器的乱序执行、编译器的重排序,还有内存系统的重排序。因此Java语言规范引入了Java内存模型,通过定义多项规则对编译器和处理器进行限制,主要是针对可见性和有序性。 2. 三个基本原则:原子性、可见性、有序性。 3. Java内存模型涉及的几个关键词:锁、volatile字段、final修饰符与对象的安全发布。其中:第一是锁,锁操作是具备happens-before关系的,解锁操作happens-before之后对同一把锁的加锁操作。实际上,在解锁的时候,JVM需要强制刷新缓存,使得当前线程所修改的内存对其他线程可见。第二是volatile字段,volatile字段可以看成是一种不保证原子性的同步但保证可见性的特性,其性能往往是优于锁操作的。但是,频繁地访问 volatile字段也会出现因为不断地强制刷新缓存而影响程序的性能的问题。第三是final修饰符,final修饰的实例字段则是涉及到新建对象的发布问题。当一个对象包含final修饰的实例字段时,其他线程能够看到已经初始化的final实例字段,这是安全的。 4. Happens-Before的7个规则: (1).程序次序规则:在一个线程内,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操作。准确地说,应该是控制流顺序而不是程序代码顺序,因为要考虑分支、循环等结构。 (2).管程锁定规则:一个unlock操作先行发生于后面对同一个锁的lock操作。这里必须强调的是同一个锁,而"后面"是指时间上的先后顺序。 (3).volatile变量规则:对一个volatile变量的写操作先行发生于后面对这个变量的读操作,这里的"后面"同样是指时间上的先后顺序。 (4).线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作。 (5).线程终止规则:线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。 (6).线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。 (7).对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。 5. Happens-Before的1个特性:传递性。 6. Java内存模型底层怎么实现的?主要是通过内存屏障(memory barrier)禁止重排序的,即时编译器根据具体的底层体系架构,将这些内存屏障替换成具体的 CPU 指令。对于编译器而言,内存屏障将限制它所能做的重排序优化。而对于处理器而言,内存屏障将会导致缓存的刷新操作。比如,对于volatile,编译器将在volatile字段的读写操作前后各插入一些内存屏障。
课后问题
有一个共享变量 abc,在一个线程里设置了 abc 的值 abc=3,你思考一下,有哪些办法可以让其他线程能够看到abc==3?
多种方法
abc加volatile保证可见性
加锁
join方法 线程a里设置abc start后再join 其他线程能看到abc为3
想要偷鸡

让我理解了volatile读屏障的作用 即 y如果为4 那么x必定为3
解决原子性
原子性问题的发生在于线程切换 单核时代没问题 但是多核时代会发生两个线程同时操作某个值 所以只要保证同一时刻对共享变量的修改是互斥的 即只有一个线程执行 就能保证原子性了
简易锁模型

相当于在你家里给全部的东西上锁 防止小偷来偷 这样代价较大 为什么不设个门锁呢?
改进锁模型

关键在于我们锁需要保护的东西就是对应的资源 可以说是某个变量
可能出问题的地方
锁住了错误的资源
锁的粒度太大 导致性能下降
锁技术synchronized
隐式规则
修饰静态 锁当前类
this就是锁当前对象
锁和受保护资源
一个锁可以保护多个资源 虽然也可以加多把锁 但是意义不大
切记 不要对两个资源 对象和类同时上锁 对象不同 就不会同步了
保护没有关联关系的多个资源
代码
class Account { // 锁:保护账户余额 private final Object balLock = new Object(); // 账户余额 private Integer balance; // 锁:保护账户密码 private final Object pwLock = new Object(); // 账户密码 private String password; // 取款 void withdraw(Integer amt) { synchronized(balLock) { if (this.balance > amt){ this.balance -= amt; } } } // 查看余额 Integer getBalance() { synchronized(balLock) { return balance; } } // 更改密码 void updatePassword(String pw){ synchronized(pwLock) { this.password = pw; } } // 查看密码 String getPassword() { synchronized(pwLock) { return password; } } }
可以使用两把锁保护不同的资源 保证并行
当然也可以直接用this 这样就是一把锁 所有操作都是串行的了
保护有关联关系的多个资源
转账保证并发
代码
class Account { private int balance; // 转账 synchronized void transfer( Account target, int amt){ if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } }
但其实存在错误

一把锁保护两个对象的资源
假设两个线程同时运行 其实都能够进入transfer方法 因为锁都是当前对象 看似是一把锁 但是target是不同的
一锁锁多个资源的情况要注意 这种情况下是错误的
使用锁的正确姿势
让所有对象持有一个唯一对象
class Account { private Object lock; private int balance; private Account(); // 创建Account时传入同一个lock对象 public Account(Object lock) { this.lock = lock; } // 转账 void transfer(Account target, int amt){ // 此处检查所有对象共享的锁 synchronized(lock) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } }
这样所有Account都共享这个变量了
但是如果创建的是不同对象 还是没有用的 lock也不同了
除非该对象是单例且不可变的 否则可能出现不同锁的情况 警惕!
解决
可以使用Account.class 锁类就可以了

课后问题

不可用可变对象做锁 否则对象一变锁就变了 会引发线程安全问题
这里给我提了个醒 之前没有意识到 一般都是对象内部创建obj作为锁 但不是也有this作为锁吗 这样不就违反了吗
加锁的本质就是在所对象的对象头中写入当前线程id 这里不涉及锁升级的过程 后续可补充
一不小心就死锁
向现实要答案
之前说了 锁整个类会出现串行化 而我们要的是并行 所以说锁的粒度太大了
模拟银行转账三种情况
转入账本和转出账本都有 都拿到 一边增加 一边减少
只有一个账本 无法转账 那就先拿一本 另一本拿到了再做增减
两个账本都没有 等被送回的时候再做操作
编程世界

对两个转账对象一起加锁 而不是锁整个类什么的 降低了锁的粒度
代码
class Account { private int balance; // 转账 void transfer(Account target, int amt){ // 锁定转出账户 synchronized(this) { // 锁定转入账户 synchronized(target) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } } }
那么代价是什么?
可能会出现死锁问题

两边同时要转账 各持有自己锁的同时去请求对方的资源
资源图

如何预防死锁
形成死锁的四个条件
互斥条件
资源ab只能被一个线程占有
不可破坏 因为我们要的就是互斥
请求保持
在线程持有A的同时去请求B资源
不可抢占
线程不能强行抢占别人的资源 只能等他们释放
循环等待
线程1请求线程2的资源 线程2请求线程1的资源 形成闭环
破坏死锁
破坏请求保持
一次性请求所有资源 比如之前有两个账本 请求的话需要一次性都拿到
现实世界

新增一个管理员
当张三要转账 但只有一个账本 那就不能够给他 只有另一个账本归位才能够转账
编程世界
代码
/** * @author sharpbb * @date 2022/5/24 14:09 */ public class Demo2 { public static void main(String[] args) { Coordinator coordinator = Coordinator.INSTANCE; Account account1 = new Account(coordinator, 100,"小明"); Account account2 = new Account(coordinator, 100,"李狗蛋"); for (int i = 0; i < 20; i++) { new Thread(() -> { account1.transfer(account2, 10); }).start(); new Thread(() -> { account2.transfer(account1, 5); }).start(); } } } /** * 保证单例不可变 */ class Coordinator { public static final Coordinator INSTANCE = new Coordinator(); public static List<Object> list = new ArrayList<>(); /** * 获取两个账户资源才能转账 * * @param from 对象1 * @param to 对象2 * @return 布尔值 */ public boolean getResource(Object from, Object to) { synchronized (INSTANCE) { //只有在两个对象都没有获取时 才加入 if (!list.contains(from) && !list.contains(to)) { list.add(from); list.add(to); return true; } else { return false; } } } public void remove() { synchronized (INSTANCE) { list.clear(); } } } @AllArgsConstructor class Account { /** * 协调者 */ private Coordinator coordinator; /** * 余额 */ private int balance; /** * 名称 */ private String name; /** * 转账 * * @param other 另一个账户 * @param money 转账金额 */ public void transfer(Account other, int money) { //三步走 判断 干活 通知 // 判断如果获取到两个资源 则不进入循环 否则自旋 while (!this.coordinator.getResource(this, other)) { ; } //先获取自己的锁 try { synchronized (this) { //再获取别人的锁 synchronized (other) { //有余额才转账 if (this.balance >= money) { this.balance -= money; other.balance += money; System.out.println(Thread.currentThread().getName() + "----- "+this.name+"转账" + money + "元 账户余额:" + this.balance +" "+ other.name+"账户:" + other.balance + "元"); } } } } catch (Exception e) { e.printStackTrace(); } finally { //释放资源 不释放就会一直阻塞的 coordinator.remove(); } } }
其实就是新增一个类 这里将两个对象放入集合 模拟拿到两个账本 转账完毕就删除
该类负责申请和释放资源
转账流程三步走
判断
是否拿到两个账本 否则等待
干活
拿到账本(两个对象) 执行转账操作
解锁
这里没有通知 但需要及时解锁
破坏不可抢占条件
synchronized做不到申请不到资源 主动释放
但是Lock可以做到主动释放
破坏循环等待条件
其实就是对资源进行排序 然后按序申请资源
代码
public class Account { private int id; private int balance; public Account(int id, int balance) { this.id = id; this.balance = balance; } public void transfer(Account other, int amt){ Account left = this; Account right = other; //为了保证有序 当this的id小于right的id 则交换元素 if (left.id > right.id) { left = other; right = this; } //之后必定是left的id要小 synchronized (left) { synchronized (right) { if (this.balance >= amt) { //当前账户扣钱 this.balance -= amt; //转入账户加钱 other.balance += amt; System.out.println(Thread.currentThread().getName() + "转账" + amt + "元 账户余额:" + this.balance+"对方账户:"+other.balance+"元"); } } } } public static void main(String[] args) { Account a1 = new Account(1,100); Account a2 = new Account(2,100); for (int i = 0; i < 10; i++) { new Thread(() -> { a1.transfer(a2, 10); }, "张三").start(); new Thread(() -> { a2.transfer(a1, 5); }, "李四").start(); } } }
很巧妙的方法 之前可能序号是乱的 但是经过排序 申请的资源一致了 就可以做到同步了
如果不排序 将会出现死锁
总结下
13都可以解决程序带来的死锁问题 但是1不仅代码长 且还有死循环 所以3 破坏循环等待 对资源进行排序的方法就很优秀了
回答下提出的问题
while虽然会有短暂的cpu空跑 但性能还是优于synchronize解锁且要串行执行
等待通知优化循环等待
之前的代码 死循环会有性能问题 该如何解决呢?
方案就是 当线程不满足条件 则阻塞自己 进入等待 满足条件 则唤醒
完美的就医流程
患者在门诊就诊
尝试获取锁
让患者去做检查
某个状态条件没有满足 等待
患者被叫到
获取到锁
患者就诊完毕 通知下一个
唤醒下一个线程
synchronized实现等待通知
不满足条件 wait进入等待

生命周期

notify 通知某线程再次尝试

注意 再次进入还是要获取锁 因为之前释放过锁了
改良代码
/** * @author sharpbb * @date 2022/5/24 14:09 */ public class Demo2 { public static void main(String[] args) { Coordinator coordinator = Coordinator.INSTANCE; Account account1 = new Account(coordinator, 100, "小明"); Account account2 = new Account(coordinator, 100, "李狗蛋"); for (int i = 0; i < 20; i++) { new Thread(() -> { account1.transfer(account2, 10); }).start(); new Thread(() -> { account2.transfer(account1, 5); }).start(); } } } /** * 保证单例不可变 */ class Coordinator { public static final Coordinator INSTANCE = new Coordinator(); public static List<Object> list = new ArrayList<>(); /** * 获取两个账户资源才能转账 * * @param from 对象1 * @param to 对象2 * @return 布尔值 */ public synchronized void getResource(Object from, Object to) { //如果其中一个资源被获取了 那么就等待 while (list.contains(from) || list.contains(to)) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(from); list.add(to); } /** * 通知写在这里了 先释放资源 然后通知将所有等待线程放入锁池 准备抢锁 */ public synchronized void remove() { list.clear(); notifyAll(); } } @AllArgsConstructor class Account { /** * 协调者 */ private Coordinator coordinator; /** * 余额 */ private int balance; /** * 名称 */ private String name; /** * 转账 * * @param other 另一个账户 * @param money 转账金额 */ public void transfer(Account other, int money) { //三步走 判断 干活 通知 // 判断如果获取到两个资源 则不进入循环 否则自旋 coordinator.getResource(this, other); //先获取自己的锁 try { synchronized (this) { //再获取别人的锁 synchronized (other) { //有余额才转账 if (this.balance >= money) { this.balance -= money; other.balance += money; System.out.println(Thread.currentThread().getName() + "----- " + this.name + "转账" + money + "元 账户余额:" + this.balance + " " + other.name + "账户:" + other.balance + "元"); } } } } catch (Exception e) { e.printStackTrace(); } finally { //这里就是真正的通知了 我就说之前的代码怎么少了通知 coordinator.remove(); } } }
尽量使用 notifyAll()
notify随机唤醒一个线程进入锁池 而notifyall唤醒所有线程进入锁池
所以前者可能会导致有些线程永远不会唤醒 造成小规模的线程饥饿
安全 活跃 性能
安全性问题
其实就是让程序按照我们期望执行
但难道每个程序都要分析 是否满足之前所说的三个条件吗?
其实并不用 只需要 存在共享数据 且该数据会发生变化 也就是多个线程会读写同一数据的情况下 就必须要满足了 否则很容易出现问题
数据竞争
代码
public class Test { private long count = 0; void add10K() { int idx = 0; while(idx++ < 10000) { count += 1; } } }
多个线程同时运行就会发生 每个线程拿到该变量 如两个线程拿到的时候都是50 执行+1 都变成51 这样中间就损失掉了
加synchronized并不能解决问题
代码
public class Test { private long count = 0; synchronized long get(){ return count; } synchronized void set(long v){ count = v; } void add10K() { int idx = 0; while(idx++ < 10000) { set(get()+1);//这里可能两个线程拿到的值都是5 都执行+1操作 那么同时设置值为6 但我们希望的是值变成7 } } }
竞态条件
程序的执行结果依赖于线程的执行顺序
即 两个线程同时+1 结果可能为1 可能为2 是不确定的
这就说明该程序存在竞态条件
解决其实说白了 就是要互斥 加锁即可
活跃性问题
死锁
之前提了 略
活锁
两个线程在执行过程中发生连续碰撞 造成阻塞
给线程加随机重试时间即可解决
线程饥饿
线程无法访问所需资源造成无法执行的情况
主要用公平锁来解决 一个个线程排队按顺序执行
性能问题
阿姆达尔定律

简单来讲就是并发最大提升为 1/串行率=20倍
解决方案
锁带来的性能问题
使用无锁算法和数据结构
线程本地存储 tls
写时复制
乐观锁
原子类
disruptor无锁队列
减少持有锁的时间
使用更细粒度的锁
ConcurrentHashMap 分段锁技术
读写锁 读无锁 写互斥
性能指标
吞吐量
单位时间内能处理的请求数量
越大越好
延迟
发出请求到相应的时间
越小越好
并发量
同时能处理的请求数量
并发增加 延迟也增加
延迟是基于并发量的
即 并发为1000 延迟50ms
小练习
该代码是线程安全的吗?
代码
public void add(Vector vector,Object o){ if (!vector.contains(o)) {//判断是否存在和add 并不是原子的 可能多个线程都判断不包含o 同时add vector.add(o); } }
管程
什么是管程
管理类的成员变量和方法 让这个类是线程安全的
MESA 模型
两大核心问题
互斥
同一时刻只有一个线程能够访问对应资源
同步
线程间的通信 协作
管程解决互斥问题
将共享变量与操作封装加锁

管程解决同步问题
模拟阻塞队列的出队入队过程

伪代码
public class BlockedQueue<T> { final Lock lock = new ReentrantLock(); // 条件变量:队列不满 final Condition A = lock.newCondition(); // 条件变量:队列不空 final Condition B = lock.newCondition(); // 入队 void enq(T x) { lock.lock(); try { while (bq.isFull()) { // 等待队列不满 A.await(); } bq.add(x); //入队后,通知可出队 notEmpty.signal(); } finally { lock.unlock(); } } // 出队 void deq() { lock.lock(); try { while (bq.isEmpty()) { // 等待队列不空 notEmpty.await(); } bq.poll(); //出队后,通知可入队 notFull.signal(); } finally { lock.unlock(); } } }
wait的正确姿势
MESA模型独有 需要在while里面用wait 防止虚假唤醒
本质上是因为wait等待后 线程被唤醒后需要再次判断 因为条件变量可能发生了变化 if就做不到了
三种模型通知执行
Hasen
t2通知完结束 t1再去执行 保证同步
Hoare
t2通知完阻塞 t1马上执行 t1执行完 唤醒t2
多了一次阻塞唤醒
Mesa
t2通知完 继续执行 t1再去排队
之前可能t1满足了条件 但现在又要验证了 所以要用while判断
notify什么时候用
之前说过了 除非深思熟虑 否则一律使用notifyAll
条件
所有线程拥有相同的等待条件
如上面阻塞队列满了 等待
所有线程被唤醒 执行相同的操作
唤醒后 干活 通知
只需要唤醒一个线程
总结
重点理解阻塞队列 加锁解锁 出队入队的条件变量 等待唤醒操作
注意新模型是通知后 另一个线程是再去排队而不是立刻执行的
Java线程
生命周期
通用线程生命周期

java将运行和可运行合并了 休眠细分了
java线程生命周期

线程状态图

状态切换
runnable与blocked
只有一种场景会触发转换 看到synchronized时 runnable->blocked 而抢到锁的时候 又从blocked->runnable
注意
jvm不关心 操作系统调度相关的状态
即是运行态还是可运行态什么的
调用阻塞式api 线程阻塞 指的是操作系统线程的状态 而不是java线程
有点疑惑 比如调用io阻塞方法 其实线程也是在runnable状态吗
runnable与waiting
三种场景触发转换
wait()
在同步块中 调用wait方法 一直等待 直到被其他线程唤醒
thread.join()
执行A.join() 那么执行这个指令的线程会被A插队 直到A执行完毕 而被插队的线程进入等待从 runnable->waiting 当A执行完毕 waiting->runnable
LockSupport.park()
juc里面的锁都是基于这玩意实现的 park runnable->waiting
unpark waiting->runnable
runnable与timed_waiting
有五种 都是调用带超时参数的方法
调用带超时参数的 Thread.sleep(long millis) 方法; 获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法; 调用带超时参数的 Thread.join(long millis) 方法; 调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法; 调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
new与runnable
继承Thread 重写run方法
实现Runnable 重写run方法
new->runnable 只需要调用start即可
runnbale到terminated
强制中断run方法 stop与interrupt 后者更好 前者过时了
stop与interrupt的区别
stop会强行中断线程 如果用了lock加锁 会无法解锁 太弟弟了
interrupt通知线程该结束了 只是起通知的作用
interrupt具体做法
通过异常
当A线程处于wait或timed_waiting时 其他线程调用A.interrupt() A线程会返回runnable态 之后抛出InterruptedException 异常 主要用来中断一直阻塞的线程
处于Runnable但操作系统层面处于阻塞状态时
当线程 A 处于 RUNNABLE 状态时,并且阻塞在 java.nio.channels.InterruptibleChannel 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 会触发 java.nio.channels.ClosedByInterruptException 这个异常; 而阻塞在 java.nio.channels.Selector 上时,如果其他线程调用线程 A 的 interrupt() 方法,线程 A 的 java.nio.channels.Selector 会立即返回。
主动检测
主动用 isInterrupted() 判断是否为true 为true表面需要中断了 自己执行中断操作 而不是被动被别人中断
练习

解答
public static void main(String[] args) throws InterruptedException { Thread th = Thread.currentThread(); new Thread(()->{ try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } th.interrupt(); }).start(); while(true) { if(th.isInterrupted()) { break; } // 省略业务代码无数 try { Thread.sleep(100); }catch (InterruptedException e){ //注意这里因为线程的状态被中断 直接抛出异常 且清除标志位 其实就是调用两次interrupt false->true->false System.out.println(Thread.currentThread().isInterrupted());//打印false //所以要再重置下标志位 改为true Thread.currentThread().interrupt(); System.out.println(Thread.currentThread().isInterrupted());//打印true e.printStackTrace(); } } }
重点注意 intrrupt调用一次就使得状态取反 第一次为true 第二次为false
而当在有异常的情况下 会抛出异常 且清理标志位
创建多少线程?
多线程的应用场景
如何降低延迟 提高吞吐量?
优化算法
将硬件性能发挥到极致
简单讲就是提高io和cpu的利用率
其实就是利用多线程
比如计算100e 一个线程计算 和四个线程并行计算 每个计算25e 速度就提升了4倍
创建多少线程合适
cpu密集型计算
对于4核cpu 只需要4个线程即可 多了反而会增加线程切换的成本 但是为了防止其他意外 一般设置线程数为核数+1
io密集型计算
最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
为什么局部变量是线程安全的
方法是如何执行的
图示

每一个方法都对应一个栈帧 里面包含局部变量表和操作数栈
前者存放局部变量 后者执行一些计算的步骤
所以方法中的局部变量都是独立的 不管有多少线程执行方法 对应的都是不同的局部变量
一切都是共享惹的祸 没有共享 就没有线程安全问题 各个都是独立的
如何用面向对象思想写好并发程序
封装共享变量
共享变量变量封装作为对象的属性 访问该对象采取同步策略
示例代码
public class Counter { private long value; synchronized long get(){ return value; } synchronized long addOne(){ return ++value; } }
这样无论读写都是线程安全的 且读和写只能同时进行一个
对于一些不会发生变化的共享变量 加final 使其不可变
既能避免并发问题
还能表明自己的设计意图 已经考虑过并发安全问题了
识别共享变量间的约束条件
示例代码
public class Test7 { public static void main(String[] args) { SafeWM safeWM = new SafeWM(); //这里设置下限为2 上限为10 safeWM.setLower(2); safeWM.setUpper(10); //但这里会引发一个情况 两个线程同时执行 设置上限满足大于2 设置下限满足小于10 同时都能设置成功 //这就是所谓的竞态条件 new Thread(()->{ safeWM.setUpper(5); }).start(); new Thread(()->{ safeWM.setLower(7); }).start(); //当然不好模拟 但还是存在的 System.out.println(safeWM.getLower().get()+" "+safeWM.getUpper().get()); } } class SafeWM { public AtomicLong getUpper() { return upper; } public AtomicLong getLower() { return lower; } // 库存上限 private final AtomicLong upper = new AtomicLong(0); // 库存下限 private final AtomicLong lower = new AtomicLong(0); // 设置库存上限 void setUpper(long v) { // 检查参数合法性 if (v < lower.get()) { throw new IllegalArgumentException(); } upper.set(v); } // 设置库存下限 void setLower(long v) { // 检查参数合法性 if (v > upper.get()) { throw new IllegalArgumentException(); } lower.set(v); } // 省略其他业务代码 }
这个代码会有竞态条件 导致两个线程同时运行 上下限设置会有问题
换言之 看似是线程安全的 实则并没有解决
对两个方法加锁可以解决同步问题
所以要识别出共享变量的约束条件 特别是有if的时候 看似没问题但背后其实是有并发问题的
制定并发访问策略
避免共享
使用线程本地存储或者为每个任务分配独立的线程
使用管程及同步工具
使用juc的包或并发容器
三条宏观原则
优先使用成熟的工具类
不要自己造轮子 何德何能写的比他们好啊
迫不得已才用同步原语 synchronized lock 等 需要小心使用
当然 先做出来 在做好 想不到更好的 先用锁就行
Lock和Condition
既然synchronize做了优化 为什么还要再造Lock相关类呢?
显然并不是因为性能问题而去再造轮子 而是因为synchronize确实有解决不了的问题
问题有三
synchronize做不到申请不到锁主动释放资源 也就是说死锁情况下的不可抢占条件 synchronize无法破坏
synchronize无法支持超时 即未获取到锁就进入阻塞状态 如果可以返回一个错误 这样的话这个线程有机会释放曾经持有的锁 避免了长期持有锁的情况
synchronize未获取到锁就进入阻塞 而如果可以直接返回 那么也可以让其释放原先持有的锁 和②类似
新的API就可以解决这些问题
// 支持中断的API void lockInterruptibly() throws InterruptedException; // 支持超时的API boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 支持非阻塞获取锁的API boolean tryLock();
lockInterruptibly
这个是直接中断 不让其再运行的 有些偏激 用法就是通过外部jxm获取死锁信息 中断对应的线程
tryLock带超时
代码
public class Demo7 { public static void main(String[] args) throws InterruptedException { Chopstick c1 = new Chopstick("1"); Chopstick c2 = new Chopstick("2"); Chopstick c3 = new Chopstick("3"); Chopstick c4 = new Chopstick("4"); Chopstick c5 = new Chopstick("5"); new Philosopher("苏格拉底", c1, c2).start(); new Philosopher("柏拉图", c2, c3).start(); new Philosopher("亚里士多德", c3, c4).start(); new Philosopher("赫拉克利特", c4, c5).start(); new Philosopher("阿基米德", c5, c1).start(); } } @Data class Chopstick extends ReentrantLock { private String name; public Chopstick(String name) { this.name = name; } } @Slf4j class Philosopher extends Thread { private Chopstick left; private Chopstick right; public Philosopher(String name, Chopstick left, Chopstick right) { super(name); this.left = left; this.right = right; } @Override public void run() { while (true) { try { if (left.tryLock(100, TimeUnit.MILLISECONDS)) { try { if (right.tryLock(100, TimeUnit.MILLISECONDS)) { log.info("吃饭~"); } } catch (Exception e) { log.info("获取其他锁超时 主动释放",e); } finally { right.unlock(); } } } catch (Exception e) { //如果获取锁超时 那么会进入这里 log.info("获取其他锁超时 主动释放",e); } finally { left.unlock(); } } } }
如果超时主动释放锁且会有异常 个人觉得第三种方式 直接返回更好一些
Lock如何保证可见性?
内部维护一个volatile修饰的state变量 主要是遵循了happen before的前三条原则 值得细细品味
用锁最佳实践
永远只在更新对象的成员变量时加锁
永远只在访问可变的成员变量时加锁
永远不在调用其他对象的方法时加锁
另一个区别 Lock中可包含多个条件变量
以之前的阻塞队列为例
维护两个变量 队列不空 队列不满
代码复习
/** * @author sharpbb * @date 2022/5/26 11:15 */ public class BlockQueue<T> { /** * lock锁 */ private static final ReentrantLock REENTRANT_LOCK = new ReentrantLock(); /** * 条件变量:队列不满 */ private static final Condition QUEUE_NOT_FULL = REENTRANT_LOCK.newCondition(); /** * 条件变量:队列不空 */ private static final Condition QUEUE_NOT_EMPTY = REENTRANT_LOCK.newCondition(); /** * 出队方法 当队列为空的时候 阻塞 * @return 返回出队元素 */ public T poll() { REENTRANT_LOCK.lock(); try { //队列为空 不能出队 只能阻塞 while(queue.isEmpty()){ //等待队列不为空 QUEUE_NOT_EMPTY.await(); } //出队操作 T t=queue.poll(); //通知队列 QUEUE_NOT_FULL.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { REENTRANT_LOCK.unlock(); } return t; } public void add() { REENTRANT_LOCK.lock(); try { //队列满了 不能添加 while (queue.isFull()) { //等待队列不满 QUEUE_NOT_FULL.await(); } //否则就可以进行入队操作 queue.add(); //通知队列 QUEUE_NOT_EMPTY.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); }finally { REENTRANT_LOCK.unlock(); } } }
出队方法中
需要判断队列是否空了 空了则等待
入队方法中
需要判断队列是否满了 满了也等待
注意 不能用wait和notify之类的方法 否则会死的很惨
异步调用与异步方法
前者是 调用方再开一个子线程调用特定方法
后者是在方法里面开一个线程执行其他业务逻辑
dubbo
好像是以前的代码 现在没找到
大致就是 rpc调用方法是异步的 然后得到结果的方法线程阻塞 等待返回结果 唤醒该线程 就是等待通知的逻辑
代码用哪个signal更好一些
// 创建锁与条件变量 private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); // 调用方通过该方法等待结果 Object get(int timeout){ long start = System.nanoTime(); lock.lock(); try { while (!isDone()) { done.await(timeout); long cur=System.nanoTime(); if (isDone() || cur-start > timeout){ break; } } } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(); } return returnFromResponse(); } // RPC结果是否已经返回 boolean isDone() { return response != null; } // RPC结果返回时调用该方法 private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } }
之前的代码用的singal()
不能说是错误的 但不安全 后面做了修改 说明了就算是权威也会出错 不要迷信权威
还是要看实际场景 这里的代码是 结果已经拿到了 那么所有线程应该都应唤醒 而不是只唤醒一个 但可能有些场景只需要唤醒一个线程 如果唤醒了全部 还会再次阻塞 那么就使用signal 但一般来讲signalAll的情况多一些
课后思考
是否存在死锁问题?
class Account { private int balance; private final Lock lock = new ReentrantLock(); // 转账 void transfer(Account tar, int amt){ while (true) { if(this.lock.tryLock()) {//想象下两者同时转账 那么两个对象就各种持有自己的锁 尝试锁成功 但获取对方的锁失败 之后不断尝试 这就是活锁的一种体现 try { //且就算余额扣减了 解锁了 也还是会执行这个循环 // 所以有两个个问题 活锁退让 死循环 可以加随机重试时间 然后转账成功加break if (tar.lock.tryLock()) { try { this.balance -= amt; tar.balance += amt; } finally { tar.lock.unlock(); } }//if } finally { this.lock.unlock(); } }//if }//while }//transfer }
信号量
复用对象池
public class ObjPool<T, R> { public final List<T> pool; // 用信号量实现限流器 public final Semaphore sem; /** * 构造函数 */ public ObjPool(int size, T t) { pool = new Vector<T>() { }; for (int i = 0; i < size; i++) { pool.add(t); } sem = new Semaphore(size); } /** * 利用对象池的对象,调用func */ @SneakyThrows R exec(Function<T, R> func) { T t = null; sem.acquire(); try { //这里的remove其实是取出元素 //① t = pool.remove(0); return func.apply(t); } finally { //② pool.add(t); //释放凭证 sem.release(); } } public static void main(String[] args) { // 创建对象池 ObjPool<Customer, String> pool = new ObjPool(10, new Customer("bb")); for (int i = 0; i < 10; i++) { new Thread(()->{ // 通过对象池获取t,之后执行 pool.exec(t -> { //对象用完就被释放 再用再取 System.out.println(t); return t.toString(); }); }).start(); } } } @Data @AllArgsConstructor class Customer{ private String name; @Override public String toString() { return this.hashCode() + ""; } }
多个线程重用多个对象 但这个对象其实是同一个 与单例不同的是 只有前n个线程可以使用到n个相同对象 剩余的需要等待线程用完释放后才可使用
在管程没出来之前 很多问题都是靠信号量解决的 它可以解决多个线程访问受限资源 类似于限流器的功能
获取凭证就是类似的获取锁 只有有凭证的线程才能够进行某些操作 操作完毕后就释放凭证 让下一个线程继续
思考题
在上面对象池的例子中,对象保存在了 Vector 中,Vector 是 Java 提供的线程安全的容器,如果我们把 Vector 换成 ArrayList,是否可以呢?
不可以 因为多个线程可以同时remove 或者add 会有线程安全问题
读写锁
适用于读多写少的场景 满足三个条件
读读共享
读写互斥
写写互斥
可以用读写锁实现缓存
注意在读缓存时加了读锁 但其他读锁也可以进入 分为两种情况
读数据缓存中存在 那么直接返回
读数据缓存中不存在 如果是一般加锁 那肯定是去db查询 但加了读锁 再次尝试读取(可能其他线程更新了缓存) 然后再去查db 放入缓存
特别注意
读锁不能升级 如果错误的升级为写锁 那么会导致线程阻塞
写锁可以降级为读锁 还支持条件变量
StampedLock
比读写锁更快
区别在于
它是读写锁的子集 有写锁 悲观写锁 乐观读 注意乐观读是无锁的
不支持重入
不支持条件变量
不要傻傻的调用interrupt操作 容易造成cpu飙高 而要用 readLockInterruptibly() 和写锁 writeLockInterruptibly()
关键还是不大会用
后续研究
CountDownLatch和CyclicBarrier
可优化流程

两个操作完全可以并行 后面操作串行 关键点在于 后面操作需要等待前面操作完成
初始代码
public class Demo5 { public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { getOrder1(); }); Thread t2 = new Thread(() -> { getOrder2(); }); t1.start(); t2.start(); t1.join(); t2.join(); check(); save(); } public static void getOrder1() { System.out.println("获取订单1"); } public static void getOrder2() { System.out.println("获取订单2"); } public static void save() { System.out.println("保存订单"); } public static void check() { System.out.println("检查差异"); } } 获取订单1 获取订单2 检查差异 保存订单
使用线程插队方法join
但是要避免频繁创建线程
使用CountDownLatch
public class Demo5 { public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); //用循环模拟查询 CountDownLatch countDownLatch = new CountDownLatch(2); pool.execute(() -> { getOrder1(); countDownLatch.countDown(); }); pool.execute(() -> { getOrder2(); countDownLatch.countDown(); }); countDownLatch.await(); //虽然是用主线程调用 但其实会被阻塞 除非计数器减到0 check(); save(); } public static void getOrder1() { System.out.println("获取订单1"); } public static void getOrder2() { System.out.println("获取订单2"); } public static void save() { System.out.println("保存订单"); } public static void check() { System.out.println("检查差异"); } }
再次并行

循环栅栏

代码
public class Demo5 { private static ExecutorService callBackPool = Executors.newFixedThreadPool(1); private static ExecutorService Pool = Executors.newFixedThreadPool(2); private static CyclicBarrier barrier = new CyclicBarrier(2, () -> { //提交后再次执行 callBackPool.execute(Demo5::commit); }); private static Vector<Order> orders1=new Vector<>(); private static Vector<Order> orders2=new Vector<>(); public static void main(String[] args) { //连续执行5次 模拟查询 run(); run(); run(); run(); run(); } public static void run() { Pool.execute(()->{ orders1.add(getOrder1()); try { //等待 只有当其他线程完成工作 才会继续 barrier.await(); } catch (Exception e) { e.printStackTrace(); } }); Pool.execute(()->{ orders2.add(getOrder2()); try { //等待 只有当其他线程完成工作 才会继续 barrier.await(); } catch (Exception e) { e.printStackTrace(); } }); } /** * 回调方法 从两个队列中获取元素 检查差异并保存 */ public static void commit() { Order order1 = orders1.remove(0); Order order2 = orders2.remove(0); Order check = check(order1, order2); save(check); } public static Order getOrder1() { System.out.println("获取订单1"); return new Order("BB"+new Random().nextInt(80000000)); } public static Order getOrder2() { System.out.println("获取订单2"); return new Order("BB"+new Random().nextInt(80000000)); } public static void save(Order order) { System.out.println("保存订单"+order); } public static Order check(Order order1,Order order2) { System.out.println("检查差异 只留一个"); return order1; } } @Data @AllArgsConstructor class Order{ private String name; }
这里单线程去取数据 防止了并发操作队列造成的数据问题 很关键
并发容器
看图

List
CopyOnWriteArrayList

适合写少读多的场景
读的时候是在原数组读
写的时候是将数组进行拷贝一份 写完后指针指向新数组 所以频繁的写就会有频繁的拷贝
Map

ConcurrentSkipListMap是有序的 ConcurrentHashMap的key是无序的
kv都不能为null
Queue
单端阻塞队列
PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队
ArrayBlockingQueue
基于数组
LinkedBlockingQueue
基于链表
LinkedTransferQueue
性能比LinkedBlockingQueue更好
双端阻塞队列
LinkedBlockingDeque
单端非阻塞队列
ConcurrentLinkedQueue
双端非阻塞队列
ConcurrentLinkedDeque
注意不要使用无界队列即可 防止OOM
原子类
基本都是使用了cas自旋无锁解决并发问题 我的感受是 原子类每个方法是原子的 但是获取和设置都要放在循环里 防止更新不成功
课后思考
public class SafeWM { class WMRange{ final int upper; final int lower; WMRange(int upper,int lower){ //省略构造函数实现 } } final AtomicReference<WMRange> rf = new AtomicReference<>( new WMRange(0,0) ); // 设置库存上限 void setUpper(int v){ WMRange nr; WMRange or = rf.get(); do{ // 检查参数合法性 if(v < or.lower){ throw new IllegalArgumentException(); } nr = new WMRange(v, or.lower); }while(!rf.compareAndSet(or, nr)); } }
是否有并发问题?
自然是有的 rf.get获取的时候只获取了一次 后面的while都会用旧的值且不会更新
如何创建正确的线程池?
线程池本质上是生产消费者模型
使用者是生产者 线程池是消费者
将任务丢入线程池中 可能直接消费 有可能放入阻塞队列中 慢慢消费 直到完成
线程池参数就不说了 有个新增的方法
allowCoreThreadTimeOut(boolean value)
设置为核心线程也可超时 最少会保留一个线程
关于拒绝策略
一般来讲不重要的 可以直接丢弃 而如果重要的内容 则通过降级手段
将任务信息插入mysql或mq来做后续处理
线程池的建议
建议不同类别的业务用不同的线程池,至于线程池的数量,各自计算各自的,然后去做压测。虽然你的系统有多个线程池,但是并不是所有的线程池里的线程都是忙碌的,你只需要针对有性能瓶颈的业务优化就可以了。
Future
如何获取任务执行结果
Future相关方法
// 取消任务 boolean cancel( boolean mayInterruptIfRunning); // 判断任务是否已取消 boolean isCancelled(); // 判断任务是否已结束 boolean isDone(); // 获得任务执行结果 get(); // 获得任务执行结果,支持超时 get(long timeout, TimeUnit unit);
取消方法
cancel()
判断任务是否取消
isCancelled()
判断任务是否结束
isDone()
这个可以配合while延时来防止阻塞
获取结果
get 有参的包含超时
这个get很关键 一般都是主线程调用返回的结果
但如果一个执行task的线程很慢 那么会导致主线程阻塞住
三个重载的submit
Future submit(Runnable task);
一般我们都是要返回值的 这个感觉没啥用
Future submit(Callable task);
可通过返回的Future来获取返回值
<T> Future submit(Runnable task, T result);
主子线程数据共享
@SpringBootTest class WebdemoApplicationTests { @Test void contextLoads() throws InterruptedException, ExecutionException, TimeoutException { mytest(); } static ExecutorService executorService = Executors.newFixedThreadPool(2); public void mytest() throws InterruptedException, ExecutionException, TimeoutException { Result result = new Result(); //第一个参数是Runnable 第二个是任意泛型 最关键的是主子线程可以实现数据共享 Future<?> future = executorService.submit(new Task(result), result); System.out.println(future.get()); } } @Data class Result{ private String name; } @Data @AllArgsConstructor class Task implements Runnable{ private Result result; @Override public void run() { result.setName("bb"); System.out.println("给result赋值"); } }
但具体用在哪就不是很清楚了
FutureTask
之前是通过方法返回值 但其实我们可以构造一个FutrueTask传入线程池 获取结果也是用这个
烧水泡茶的例子
 @SpringBootTest class WebdemoApplicationTests { @Test void contextLoads() throws Exception{ mytest(); } static ExecutorService pool = Executors.newFixedThreadPool(2); /** * 泡茶流程 * 思路 * 两个线程 t1洗茶壶 烧开水 泡茶 但t1依赖于t2线程的茶叶 * 而t2执行较快 所以将茶叶提供给t1线程即可 * 最终打印t1t2的结果 */ public void mytest() throws Exception { Task2 task2 = new Task2(); FutureTask<String> futureTask2 = new FutureTask<>(task2); Task1 task1 = new Task1(futureTask2); FutureTask<String> futureTask1 = new FutureTask<>(task1); pool.submit(futureTask1); pool.submit(futureTask2); //最终得到结果 直接用futureTask获取结果比较方便 System.out.println(futureTask1.get()); //最终结果 //洗水壶 //洗茶壶 //洗茶杯 //拿茶叶 //烧开水 //泡茶 //上茶:西湖龙井 } } @AllArgsConstructor class Task1 implements Callable<String> { private final FutureTask<String> futureTask; @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(1); System.out.println("洗水壶"); TimeUnit.SECONDS.sleep(10); System.out.println("烧开水"); //获取茶叶 String res = futureTask.get(); System.out.println("泡茶"); return "上茶:" + res; } } class Task2 implements Callable<String> { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(1); System.out.println("洗茶壶"); TimeUnit.SECONDS.sleep(2); System.out.println("洗茶杯"); TimeUnit.SECONDS.sleep(1); System.out.println("拿茶叶"); return "西湖龙井"; } }
t1内部依赖t2的结果茶叶 任务的执行是被封装在FutureTask中执行的
总结
我们遇到一些串行任务改成并行 画图很重要 而不是一昧的蛮干 这样很容易出错 比如画出上面的烧水泡茶的图 如何执行就一目了然了
课后思考
@SpringBootTest class WebdemoApplicationTests { @Test void contextLoads() throws Exception{ mytest(); } static ExecutorService pool = Executors.newFixedThreadPool(3); /** * 方案优化 询价应用 从三个电商平台获取价格 并保存到db中 但这是串行的 改用并行解决 * 向电商S1询价,并保存 * r1 = getPriceByS1(); * save(r1); * 向电商S2询价,并保存 * r2 = getPriceByS2(); * save(r2); * 向电商S3询价,并保存 * r3 = getPriceByS3(); * save(r3); * <p> * 其实只需要从线程池中取三个线程 执行代码即可 关键在于结果是短板效应 会阻塞 考虑用轮询? */ @Autowired private UserService userService; public void mytest() throws Exception { FutureTask<String> task1 = new FutureTask<>(() -> { TimeInterval timer = DateUtil.timer(); String priceByS1 = userService.getPriceByS1(); System.out.println("t1用时:"+timer.interval()); return priceByS1; }); FutureTask<String> task2 = new FutureTask<>(() -> { TimeInterval timer = DateUtil.timer(); String priceByS2 = userService.getPriceByS2(); System.out.println("t2用时:"+timer.interval()); return priceByS2; }); FutureTask<String> task3 = new FutureTask<>(() -> { TimeInterval timer = DateUtil.timer(); String priceByS3 = userService.getPriceByS3(); System.out.println("t3用时:"+timer.interval()); return priceByS3; }); pool.submit(task1); pool.submit(task2); pool.submit(task3); TimeInterval timer = DateUtil.timer(); String res = ""; while (true) { //完成了就存db if (task1.isDone()) { res += task1.get(); break; } else { TimeUnit.MILLISECONDS.sleep(10); } } while (true) { //完成了就存db if (task2.isDone()) { res += task2.get(); break; } else { TimeUnit.MILLISECONDS.sleep(10); } } while (true) { //完成了就存db if (task3.isDone()) { res += task3.get(); break; } else { TimeUnit.MILLISECONDS.sleep(10); } } save(res); System.out.println("主线程用时:"+timer.interval()); } private void save(String s) { System.out.println("数据["+s+"]存入数据库"); } }
这里采用三个线程分别执行操作 用时是短板查询 采用了轮询的操作 而后批量将数据插入到db
CompletableFuture
CompletableFuture 的核心优势
三个线程烧茶泡水

代码
@SpringBootTest class WebdemoApplicationTests { @Test void contextLoads() throws Exception { mytest(); } static ExecutorService pool = Executors.newFixedThreadPool(3); private void mytest() { //t1线程 CompletableFuture<Void> c1 = CompletableFuture.runAsync(() -> { sleep(1); System.out.println("洗水壶"); sleep(10); System.out.println("烧开水"); }, pool); //t2线程 CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> { sleep(1); System.out.println("洗茶壶"); sleep(2); System.out.println("洗茶杯"); sleep(1); System.out.println("拿茶叶"); return "西湖龙井"; }, pool); //t3线程 CompletableFuture<String> c3 = c1.thenCombine(c2, (res1, res2) -> { System.out.println("拿到茶叶:" + res2); System.out.println("泡茶"); return "上茶-" + res2; }); //这里的join是插主线程的队 否则主线程运行完就看不到结果了 System.out.println(c3.join()); } @SneakyThrows private void sleep(int s) { TimeUnit.SECONDS.sleep(s); } }
比之前的代码简单很多 且清晰明了 值得研究争取在项目中使用
代码清晰明了 无需手工维护线程
强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
如何理解 CompletionStage 接口
描述串行关系
thenApply、thenAccept、thenRun 和 thenCompose
使用
@SneakyThrows private void mytest() { //t1线程 CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "hello"); //1.在原基础上操作并返回新值 CompletableFuture<String> c2 = c1.thenApply(s -> s + " world"); //2.消费掉c2的返回值 c2.thenAccept(System.out::println); //3.无参无返回 可能中间做个延时操作 c2.thenRun(() -> { sleep(1); }); //4.还可以再次联合CompletableFuture做操作 其实就是thenApply但放在内部执行了 c2.thenCompose(res -> CompletableFuture.supplyAsync(() -> res + "!!!")) //再输出下 .thenAccept(System.out::println); }
必须等到某个执行完毕后再执行后续的
描述 AND 汇聚关系
thenCombine、thenAcceptBoth 和 runAfterBoth
使用
@SneakyThrows private void mytest() { //t1线程 CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> "world"); //1.将两个结果合并 c1.thenCombine(c2, (r1, r2) -> r1 + r2); //2.合并消费掉 无返回值 c1.thenAcceptBoth(c2, (r1, r2) -> { System.out.println(r1 + r2); }); //3.用不到其中的返回值 无参无返回 感觉用的不多 c1.runAfterBoth(c2, () -> { sleep(1); }); }
描述 OR 汇聚关系
applyToEither、acceptEither 和 runAfterEither
使用
@SneakyThrows private void mytest() { //t1线程 CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> "world"); //1.c1 c2 哪个结果先出来就使用哪个 c1.applyToEither(c2, res -> res + "!!!") .thenAccept(System.out::println); //2.这个同理 但得到的结果必须消费掉 c1.acceptEither(c2, System.out::println); //3.无参无返回 特定场景使用 c1.runAfterEither(c2, () -> { sleep(1); }); }
异常处理
exceptionally()相当于catch 而whenComplete不返回值 而handler返回值 相当于finally
使用
@SneakyThrows private void mytest() { //t1线程 CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> "world"); //执行计算 CompletableFuture.supplyAsync(() -> (7 / 0)) //如果正确 则结果*10 .thenApply(r -> r * 10) //如果错误返回0 .exceptionally(e -> -1) //虽然上面是异常 但到了下面变成了res 而e为null .handle((res,e)-> res+999) //最终打印正确或错误的结果 .thenAccept(System.out::println); }
思考题

异常未处理 且应该使用自定义的线程池
否则操作阻塞 所有使用默认线程池的操作都会停滞
我觉得还可以加超时操作 get超时操作会抛异常 处理下 有该异常不让其通过
CompletionService
使用阻塞队列解决问题
private static void useBlockedQueue() throws InterruptedException { LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> submit1 = pool.submit(() -> query1()); Future<Integer> submit2 = pool.submit(() -> query2()); Future<Integer> submit3 = pool.submit(() -> query3()); pool.execute(() -> { try { queue.put(submit1.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); pool.execute(() -> { try { queue.put(submit2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); pool.execute(() -> { try { queue.put(submit3.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); for (int i = 0; i < 3; i++) { Integer take = queue.take(); pool.execute(() -> { save(take); }); } }
自己使用下 CompletionService
//自己使用下CompletionService private static void useCompletionService() throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); ExecutorCompletionService<Integer> cs = new ExecutorCompletionService<>(pool); cs.submit(Demo3::query1); cs.submit(Demo3::query2); cs.submit(Demo3::query3); for (int i = 0; i < 3; i++) { Integer res = cs.take().get(); pool.execute(()->{ save(res); }); } }
传一个参数是线程池 默认用的是无界阻塞队列
传两个参数 第二个是阻塞队列 推荐应该设置为有界的
返回最快相应的服务查询的价格
//查询三个服务 最先得到结果的直接返回 取消其他任务 private static void quickQueryService() { ExecutorService threadPool = Executors.newFixedThreadPool(3); //构造需要自己传入线程池 ExecutorCompletionService<Integer> cs = new ExecutorCompletionService<>(threadPool); List<Future> futures = new ArrayList<>(); //加入futures数组 因为我们后面要遍历该数组取消任务 futures.add(cs.submit(Demo3::query1)); futures.add(cs.submit(Demo3::query2)); futures.add(cs.submit(Demo3::query3)); //正式开发要计算下共有几个查询的服务 Integer res = cal(cs, futures); //可以打印或者保存到db System.out.println(res); save(res); } private static Integer cal(ExecutorCompletionService<Integer> cs, List<Future> futures) { Integer res = null; for (int i = 0; i < 3; i++) { try { res = cs.take().get(); //如果一个服务有结果了 直接返回 if (res != null) { break; } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { for (Future future : futures) { future.cancel(true); } } } return res; }
总结
这个类可以将任务批量执行 而后通过阻塞队列的方式获取结果 比单纯的get阻塞方法来的好 但我的问题是 如果任务获取完了 队列还阻塞咋办?
思考题

有三个问题
如果直接调用 会直接获取价格 应等到所有线程执行完毕再获取
原子类的获取和设置不是原子的
不应该吞噬异常
不变模式
需要保证
类和属性都是 final 的,所有方法均是只读的
而如果变量是引用类型 值的内容是不能被修改的
String是返回的新对象
为了解决频繁创建对象
可以使用享元模式/对象池
无状态
具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。 其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法。除了无状态的对象,你可能还听说过无状态的服务、无状态的协议等等。 无状态有很多好处,最核心的一点就是性能。 在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好; 在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。
课后思考

通过getUser 获取sb 再通过append即可改变对象内容
写时复制
用到的领域
操作系统
子进程fork 读的时候指向父进程 只有发生改动才会重新赋值一份进程地址空间
各种文件系统
docker git
函数式编程
所有的修改操作都需要cow
之前并不流行函数式编程 只不过硬件提升了才慢慢热门
总结
cow是一种很常见的解决线程安全的手段
但缺点就是消耗内存 但现在稍微消耗一些也没什么了
课后思考
为什么没有cowLinkedList呢?
因为链表本就读取较慢 不适合这种读多写少的场景 与cow天然不一致
线程本地存储
工作原理
因为我们需要多个线程共享同一个变量 而同一个线程两次get得到的内容是相同的 那么自然就可以想到ThreadLocal的结构了
即内部有一个map 线程为key 变量为v

但java中的实现是不一样的

是为线程创建了ThreadLoal
最主要的原因是为了防止内存泄漏
如果按照我们的设计 即便某个线程结束 但还是在tl内部 不使用了却回收不了
这里的key -> tl是弱引用 只要线程被回收了 就能回收tlMap
线程池里的tl内存泄漏
因为线程池中的线程是复用的 存活时间很长 但k->tl被回收了 而v是强引用 导致v一直存在无法回收
解决方案
ExecutorService es; ThreadLocal tl; es.execute(()->{ //ThreadLocal增加变量 tl.set(obj); try { // 省略业务逻辑代码 }finally { //手动清理ThreadLocal tl.remove(); } });
用完后 remove掉k 整个kv对都移除了 就不存在引用了
总结
存在共享资源 为了解决线程安全问题 可以采用这个方案 但是要注意及时remove掉对应的k
课后思考
实际工作中,有很多平台型的技术方案都是采用 ThreadLocal 来传递一些上下文信息,例如 Spring 使用 ThreadLocal 来传递事务信息。我们曾经说过,异步编程已经很成熟了,那你觉得在异步场景中,是否可以使用 Spring 的事务管理器呢?
异步编程和tl用的线程不是同一个 变量就不能被共享了 这是完全错误的
保护性暂停
可以解决异步转同步问题
详情案例看印象笔记多线程55
总结
保护性暂停其实就是借助wait/notify之类的代码 当某if条件为真时立刻唤醒执行操作
课后思考
//获取受保护对象 T get(Predicate<T> p) { try { //obj的可见性无法保证 while(!p.test(obj)){ TimeUnit.SECONDS .sleep(timeout); } }catch(InterruptedException e){ throw new RuntimeException(e); } //返回非空的受保护对象 return obj; } //事件通知方法 void onChanged(T obj) { this.obj = obj; }
这个代码完成异步转同步怎么样?
sleep可能存在太长或太短 太长影响性能 太短又会频繁的唤醒在阻塞 且obj应是volatile的保证可见性
犹豫模式
和上面一样 也是多线程版的if
典型实现就是单次的实例化 dcl就是犹豫模式的体现
当一个线程执行过后 其他线程不会再次执行
思考题

可能多个线程看到inited为false进入 同时设置为true cal多次
解决
加锁或者使用AtomicBoolean的cas操作
Thread-Per-Message
每个任务都由一个线程去执行
起先我不以为意 因为java创建线程对应的就是操作系统的线程 成本很高 即便用完销毁也扛不住高并发 原来就是java语言本身有问题 其他语言引入了协程 更加轻量级
jdk19的虚拟线程就是协程
一个线程可以调度成千个协程 用完即销毁 不需要池化
课后思考
使用 Thread-Per-Message 模式会为每一个任务都创建一个线程,在高并发场景中,很容易导致应用 OOM,那有什么办法可以快速解决呢?
短暂解决可以调整jvm的堆大小 或者引入nio netty
口嗨一下
老师提到了通过限流来解决oom
工作线程
实际就是池化技术 避免了线程的频繁创建与销毁
正确创建注意点
使用有界队列防止oom
指明拒绝策略
给线程赋予一个业务相关的任务
线程池共用导致死锁
代码
//L1、L2阶段共用的线程池 ExecutorService es = Executors. newFixedThreadPool(2); //L1阶段的闭锁 CountDownLatch l1=new CountDownLatch(2); for (int i=0; i<2; i++){ System.out.println("L1"); //执行L1阶段任务 es.execute(()->{ //L2阶段的闭锁 CountDownLatch l2=new CountDownLatch(2); //执行L2阶段子任务 for (int j=0; j<2; j++){ es.execute(()->{ System.out.println("L2"); l2.countDown(); }); } //等待L2阶段任务执行完 l2.await(); l1.countDown(); }); } //等着L1阶段任务执行完 l1.await(); System.out.println("end");
两个线程执行完l1 后面都被卡在L2.await上了
虽然可以通过增加线程数来解决
但更通用的办法是为不同的任务创建不同的线程池
课后题

使用单个线程造成阻塞 且吞噬了异常
解决
可以使用不同线程池 且加上异常打印
两阶段终止
使用interrupt发送中断标志
最好不要用 可能第三方类库会catch异常清除标志位 导致线程无法结束
还是使用自己设置的volatile标志位比较稳妥
案例演示
public class Demo6 { public static void main(String[] args) throws InterruptedException { Proxy proxy = new Proxy(); proxy.start(); Thread.sleep(2000); proxy.stop(); } } class Proxy { boolean started = false; //采集线程 Thread rptThread; volatile boolean interrupted; //启动采集功能 public synchronized void start() { //不允许同时启动多个采集线程 if (started) { return; } started = true; rptThread = new Thread(() -> { //没有中断就一直执行 while (!interrupted) { //省略采集、回传实现 report(); //每隔两秒钟采集、回传一次数据 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //退出循环打印下 System.out.println("中断!"); started = false; }); rptThread.start(); } private void report() { System.out.println("监控数据~~"); } //终止采集功能 public synchronized void stop() { interrupted = true; } }
线程池销毁
强制销毁线程池 但未执行的任务会被放入set中 从中获取可以再次执行
class DemoExecutor extends AbstractExecutorService{ private final ExecutorService exec; private final Set<Runnable> taskCancelSet = new ConcurrentHashSet<>(); public List<Runnable> getCancelTasks() throws IllegalAccessException { if (!exec.isTerminated()) { throw new IllegalAccessException("线程池还未结束 无法获取"); } return new ArrayList<>(taskCancelSet); } public DemoExecutor(ExecutorService exec) { this.exec = exec; } @Override public void shutdown() { exec.shutdown(); } @Override public List<Runnable> shutdownNow() { return exec.shutdownNow(); } @Override public boolean isShutdown() { return exec.isShutdown(); } @Override public boolean isTerminated() { return exec.isTerminated(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout,unit); } @Override public void execute(Runnable task) { exec.execute(()->{ try { task.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { taskCancelSet.add(task); } } }); } }
使用shutdown
结束前拒绝新任务 会先执行完所有任务才销毁 比较温和
使用shutdownNow
强制销毁 拒绝新任务且中断正在执行的任务
对这个还有点疑问 进行中的任务还会执行 队列中的任务不会丢失 可以在返回值中获取
并且使用他推荐的方法并没有获取到对应的取消任务
生产消费者模式
优点
解耦
生产者和消费者可以没有任何的关系
异步
生产者只需要发送一个消息就无需再管后续流程了 无需等待
任务队列
平衡生产消费者之间的关系
如果生产慢 消费快 速率比是1:3 那么线程比例可以是3:1 中间添加任务队列做缓冲 否则线程数是一比一的关系
监控数据并批量入库
@Service public class BatchTaskService extends ServiceImpl<ProducerConsumerTestMapper, ProducerConsumerTest> { @SneakyThrows public void batch() { Proxy proxy = new Proxy(this); proxy.start(); TimeUnit.SECONDS.sleep(3); proxy.batchCollect(); proxy.stop(); TimeUnit.SECONDS.sleep(10); } } @Slf4j class Proxy { public Proxy(BatchTaskService batchTaskService) { this.batchTaskService = batchTaskService; } private BatchTaskService batchTaskService; boolean started = false; //采集线程 Thread rptThread; volatile boolean interrupted; LinkedBlockingQueue<Task> queue = new LinkedBlockingQueue<>(200); public void batchCollect() { log.info("开始批量采集"); ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { pool.execute(() -> { while (true) { try { List<Task> tasks = pollTasks(); execTasks(tasks); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } List<Task> pollTasks() throws InterruptedException { List<Task> ts = new LinkedList<>(); Task t = queue.take(); while (t != null) { ts.add(t); //非阻塞式获取一条任务 t = queue.poll(); } return ts; } //批量插入的方法 void execTasks(List<Task> ts) { log.info("批量插入~"); List<ProducerConsumerTest> list = new ArrayList<>(); for (Task t : ts) { ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest(); producerConsumerTest.setTime(t.getDate()); producerConsumerTest.setContent(t.getContent()); list.add(producerConsumerTest); } log.info("列表数据:{}", list); batchTaskService.saveBatch(list); } //启动采集功能 public synchronized void start() { log.info("启动采集功能~"); //不允许同时启动多个采集线程 if (started) { return; } started = true; rptThread = new Thread(() -> { //没有中断就一直执行 while (!interrupted) { //省略采集、回传实现 report(); //每隔两秒钟采集、回传一次数据 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } //退出循环打印下 System.out.println("中断!"); started = false; }); rptThread.start(); } private void report() { log.info("监控数据"); Task task = new Task("监控数据~~", new Date()); queue.add(task); } //终止采集功能 public synchronized void stop() { log.info("终止采集功能"); interrupted = true; } } @Data @AllArgsConstructor class Task { private String content; private Date date; }
一般其实用mq来实现的
意外暂停的话mq还可以保留数据
思考

这个是消费消息的 如果要终止任务了 投放一个毒丸 只要识别到这个消息 即break
案例汇总
限流
令牌桶算法
如果限制速率是每秒通过10个请求 那么1/10秒就会在桶中创建一个令牌
还有个容量b 主要是用来应对突发容量 比如为20 那么突发20个请求带走桶中所有令牌 可能是一秒通过了20个请求 但之后又恢复原样了
guava如何实现的?
单个线程提前抢占
 速率和容量都是1 之前桶中没有令牌 那么第2秒来了请求 在第3秒的时候才能真正获取 下一令牌产生时间就会相应的延后 
延后令牌产生时间
两线程提前抢占
 T1线程抢占第2秒令牌发现没有 那么第3秒的令牌归他 而同时T2线程也来了 那么第四秒的令牌归T2  最终下一令牌时间相应的延后到第5秒
再延后时间
令牌产生后再抢占
 第5秒生成了令牌 T3到第7秒才抢占 其实5,6秒生成的令牌会被丢弃 第8秒重新生成一个令牌 
netty省略
disruptor
RingBuffer 如何提升性能
使用了数组 一次性创建完毕 内存上是连续的
消费第一个的时候会将相邻的几个元素也从内存加载到cpu缓存中 这样消费其他元素时直接通过cpu缓存中读取 大大提高效率
生产者发布事件的时候也不是new 而是通过set重新设置内容 避免了频繁创建对象
并发模型汇总
Actor
该模型是异步单线程 无锁无cas的
每个actor之间是相互隔离的 所以是线程安全的
但它不保证消息的可靠传达和处理 也无法处理顺序问题 个人感觉不是很靠谱但还是增长了见识
协程
轻量级的线程 jdk19才有虚拟线程 其他语言倒是不少都支持 了解即可
csp模型
解决线程通信一般有两种方案
共享内存
java就是采用的这种
消息传递
go用的是这种
与actor的不同之处在于
它是借助一个channel阻塞队列通信的 能保证可靠送达 但可能会导致死锁
结语
选择比努力重要 选择好的赛道大过盲目努力
找到自己的问题 并不断改正
人不应该高估自己 要承认自己的浅薄与无知
不是随便一个领域我们都能干的好的 有些事该放就放