导图社区 并发基础
This is a mind map about 并发基础,Main content: 线程池,并发集合,atomic,并发工具,阻塞队列。
编辑于2025-03-28 15:09:20并发基础
并发集合
HashMap
数据结构
数组
默认长度16,最大长度1<<30, 长度必须2的指数幂
扩容
扩容为原来的2倍
扩容长度2的指数幂
扩容因子0.75
时间复杂度、空间复杂度获取应均衡
牛顿二项式推断
安全性
1.7多线程扩容产生循环链表(头插入)
get操作会丢失数据
1.8采用高低位拆分,避免循环链表
链表
链表转红黑树
链表长度达到8
扩展因子为0.75的情况下,根据泊松分布概率计算出8是最合理
数组长度达到64
红黑树
红黑树转链表
链表长度达到6
concurrentHashMap
数据结构
与HashMap基本类似
区别
1、内部在数据 写入时加了同步机制(分段锁)保证线程安全,读操作是无锁操作
2、扩容时老数据的转移 是并发执行的,这样扩容的效率更高。
写加锁,读无锁
Java7与java8比较
数据结构
Java7采用Segment分段锁实现,Java8使用数组+链表+红黑树实现
保证并发
Java7 ConcurrentHashMap基于ReentrantLock实现分段锁
所有的Segment继承ReentrantLock
Java8中 ConcurrentHashMap基于Node+分段锁+CAS保证线程安全,分段锁基于synchronized
Hash碰撞
java7链表
java8链表+红黑树
时间复杂度
Java7遍历链表O(n)
Java8如果转换为红黑树O(log(n))
CopyOnWriteArrayList
核心思想
读写分离,空间换时间,避免为保证并发安全导致的激烈的锁竞争
使用场景
适用于读多写少的情况,最大程度的提高读的效率
是最终一致性,在写的过程中,原有的读的数据是不会发生更新的,只有新的读 才能读到最新数据;
如何使其他线程能够及时读到新的数据,需要使用volatile变量
写的时候不能并发写,需要对写操作进行加锁
缺点
1、内存占用大 2、存在数据一致性问题 3、数据量大的情况下复制开销很大
线程池
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不 仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、 调优和监控 如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁 线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多
线程
线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模 型,Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有 一个对应的线程
状态
NEW,新建 RUNNABLE,运行 BLOCKED,阻塞 WAITING,等待 TIMED_WAITING,超时等待 TERMINATED,终结
子主题
优势
重用存在的线程,减少线程创建,消亡的开销,提高性能 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资 源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
流程
线程池的具体实现
ThreadPoolExecutor 默认线程池
public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) 任务提交 1、public void execute() //提交任务无返回值2 2、public Future<?> submit() //任务执行完成后有返回值
参数解释
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当 前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会 提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线 程执行任务,前提是当前线程数小于maximumPoolSize
keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时 候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待 的时间超过了keepAliveTime;
unit
keepAliveTime的单位
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供 了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到 另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程 时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设 置了线程的名称。
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必 须采取一种策略处理该任务,线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。
数量控制
cup密集度
= CPU核数+1 (现代CPU支持超线程)
IO密集度
cup核数倍数
ScheduledThreadPoolExecutor 定时线程池
提交任务 的方式
1. schedule
该方法是指任务在指定延迟时间到达后触发,只会执行一次 1 public ScheduledFuture<?> schedule(Runnable command, 2 long delay, 3 TimeUnit unit) { 4 //参数校验 5 if (command == null || unit == null) 6 throw new NullPointerException(); 7 //这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask 8 //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个 空方法 9 RunnableScheduledFuture<?> t = decorateTask(command, 10 new ScheduledFutureTask<Void>(command, null,11 triggerTime(delay, unit))); 12 //包装好任务以后,就进行提交了 13 delayedExecute(t); 14 return t; 15 } 任务提交方法: 1 private void delayedExecute(RunnableScheduledFuture<?> task) { 2 //如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉 3 if (isShutdown()) 4 reject(task); 5 else { 6 //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列 7 super.getQueue().add(task);//使用用的DelayedWorkQueue 8 //如果当前状态无法执行任务,则取消 9 if (isShutdown() && 10 !canRunInCurrentRunState(task.isPeriodic()) && 11 remove(task)) 12 task.cancel(false); 13 else 14 //这里是增加一个worker线程,避免提交的任务没有worker去执行 15 //原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列 16 ensurePrestart(); 17 } 18 }
2. scheduledAtFixedRate
3. scheduledWithFixedDelay
存储任务
1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若 time相同则根据sequenceNumber排序; 2. DelayQueue也是一个无界队列;
存储逻辑结构-堆存储
数据数据结构-数组
使用场景
分布式锁、用定时线程续命
注册中心、心跳服务
Executors类提供了4种不同的线程池
newCachedThreadPool
用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务
newFixedThreadPool
创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制
newSingleThreadExecutor
创建一个单线程的线程池,适用于需要保证顺序执行各个任务。
newScheduledThreadPool
适用于执行延时或者周期性任务
关闭线程池
shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。
Fork/Join
Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后 得到大任务结果的框架
特性
1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService ) 2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。 3. ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配 合使用 ManagedBlocker。 1. ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务 (ForkJoinTask)。 2. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作 队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。 3. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线 程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。 4. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。 5. 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
使用
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通 常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类: RecursiveAction:用于没有返回结果的任务。(比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成 更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction) RecursiveTask :用于有返回结果的任务。(可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并) CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数
执行流程
阻塞队列
队列
类型
1. 无限队列 (unbounded queue ) - 几乎可以无限增长
2. 有限队列 ( bounded queue ) - 定义了最大容量
数据结构
通常用链表或者数组实现
一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级 队列
主要操作:入队(EnQueue)与出队(Dequeue)
BlockingQueue
解决并发生产者 - 消费者问题 的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且 BlockingQueue提供了超时return null的机制
ArrayBlockingQueue
由数组支持的有界队列
基于ReentrantLock保证线程安全
根据Condition实现队列满时的阻塞
LinkedBlockingQueue
是一个基于链表的无界队列(理论上有界)
PriorityBlockingQueue
由优先级堆支持的无界优先级队列
DelayQueue
由优先级堆支持的、基于时间的调度队列
内部基于无界队列PriorityQueue实现
而无界 队列基于数组的扩容实现。
入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口
并发工具
Semaphore
Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。 public class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorServicethreadPool = Executors .newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (inti = 0; i< THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } }
构造方法
1 public Semaphore(int permits) 2 public Semaphore(int permits, boolean fair)
permits 表示许可线程的数量==存入AQS state状态,表示限流数量 fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线 程
重要方法
acquire() 表示阻塞并获取许可 release() 表示释放许可
使用场景
资源访问,服务限流(Hystrix里限流就有基于信号量方式)。
CountDownLatch
使用场景
能够使一个线程等待其他线程完成各自的工作后再执行 Zookeeper分布式锁,Jmeter模拟高并发等
重要方法
1 CountDownLatch.countDown() 2 CountDownLatch.await();
CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程 到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
重要方法
cyclicBarrier.await(); 构造方法是CyclicBarrier(int parties)
atomic
可以通过锁和循环CAS的方式来实现原子操作
基本类
AtomicInteger、AtomicLong、AtomicBoolean
引用类型
AtomicReference、AtomicReference的ABA实例、 AtomicStampedRerence、AtomicMarkableReference
数组类型
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
属性原子修改器
AtomicIntegerFieldUpdater、 AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
spring