导图社区 并发编程-精通JUC
java并发编程中,常用的juc包工具类。
编辑于2020-07-22 01:20:38JAVA
并发编程-精通JUC
并发工具类的分类
为了线程安全(从底层原理来分类)
1. 互斥同步
1. 想要获取资源,必须先要独占它
互斥同步锁
synchronized
ReetrantLock
ReadWriteLock
...
同步工具类
本质的原理是利用Synchronized对集合进行了同步,所以也属于互斥同步的范畴 (老版本用的)
Collections.synchronizedList(new ArrayList<>()); 等
Vector 等
2. 非互斥同步
atomic包,原子类
Atomic*基本类型原子类
AtomicInteger:整形原子类
AtomicLong:长整型原子类
AtomicBoolean:布尔类型原子类
Atomic*Array数组类型原子类
数组里的元素都可以保证原子性
AtomicIntegerArray:整形数组原子类
AtomicLongArray:长整型数组原子类
AtomicReferenceArray:引用类型数组原子类
Atomic*Reference引用类型原子类
AtomicReference:引用类型原子类
AtomicStampedReference:引用类型原子类的升级
带时间戳,可以解决ABA问题
AtomicMarkableRefrence
Atomic*FieldUpdater升级原子类
用Atomic*FieldUpdater等升级自己的变量
Adder加法器
性能比Atomic基本类型原子类更好
LongAdder
DoubleAdder
Accumulator累加器
LongAccumulator
DoubleAccumulator
3. 结合互斥和非互斥同步
线程安全的并发容器
ConcurrentHashMap
CopyOnWriteArrayList
并发队列
阻塞队列
ArrayBlockingQueue
LinkedBlockingQueue
PoiorityBlockingQueue
SynchronousQueue
DelayedQueue
TransferQueue
...
非阻塞队列
ConcurrentLinkedQueue
ConcurrentSkipListMap和 ConcurrentSkipListSet
4. 无同步方案、不可变
final关键字
永远都不会变化,它就也是线程安全的
线程封闭
ThreadLocal
栈封闭
为了线程安全(从使用者的角度来分类)
为了方便管理线程、提高效率
线程池相关
Executor
Executor Executor 是一个抽象层面的核心接口(大致代码如下)。 public interface Executor { void execute(Runnable command); }
Executors
Executors Executors 是一个工具类,类似于 Collections。提供工厂方法来创建不同类型的线程池,比如 FixedThreadPool 或 CachedThreadPool。 Executors 部分代码: public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } }
ExectorService
ExecutorService ExecutorService 接口 对 Executor 接口进行了扩展,提供了返回 Future 对象,终止,关闭线程池等方法。当调用 shutDown 方法时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。 Future 对象提供了异步执行,这意味着无需等待任务执行的完成,只要提交需要执行的任务,然后在需要时检查 Future 是否已经有了结果,如果任务已经执行完成,就可以通过 Future.get() 方法获得执行结果。需要注意的是,Future.get() 方法是一个阻塞式的方法,如果调用时任务还没有完成,会等待直到任务执行结束。 通过 ExecutorService.submit() 方法返回的 Future 对象,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。 ExecutorService 部分代码如下: public interface ExecutorService extends Executor { void shutdown(); <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; }
区别和总结,见注释
Executor vs ExecutorService vs Executors 正如上面所说,这三者均是 Executor 框架中的一部分。Java 开发者很有必要学习和理解他们,以便更高效的使用 Java 提供的不同类型的线程池。总结一下这三者间的区别,以便大家更好的理解: Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口 Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。 Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。 Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。可以通过 《Java Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。 Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。 总结 下表列出了 Executor 和 ExecutorService 的区别: Executor ExecutorService Executor 是 Java 线程池的核心接口,用来并发执行提交的任务 ExecutorService 是 Executor 接口的扩展,提供了异步执行和关闭线程池的方法 提供execute()方法用来提交任务 提供submit()方法用来提交任务 execute()方法无返回值 submit()方法返回Future对象,可用来获取任务执行结果 不能取消任务 可以通过Future.cancel()取消pending中的任务 没有提供和关闭线程池有关的方法 提供了关闭线程池的方法
常见线程池
FixedThreadPool
创建固定线程数量的线程池 nThreads:线程数量 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
CachedThreadPool
线程数量无限大,Integer.MAX_VALUE public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ScheduledThreadPool
SingleThreadExecutor
简单一句话概括,就是创建一个单线程的线程池。
ForkJoinPool
...
能获取子线程的运行结果
Callable
Future
FutureTask
为了线程之间配合,来满足业务逻辑
CountDownLatch
package flowcontrol.countdownlatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 描述: 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步。当所有人都到终点后,比赛结束。 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { System.out.println("No." + no + "准备完毕,等待发令枪"); try { begin.await(); System.out.println("No." + no + "开始跑步了"); Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + "跑到终点了"); } catch (InterruptedException e) { e.printStackTrace(); } finally { end.countDown(); } } }; service.submit(runnable); } //裁判员检查发令枪... Thread.sleep(5000); System.out.println("发令枪响,比赛开始!"); begin.countDown(); end.await(); System.out.println("所有人到达终点,比赛结束"); service.shutdown(); } }
CyclicBarrier
package flowcontrol.cyclicbarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 描述: 演示CyclicBarrier */ public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("所有人都到场了, 大家统一出发!"); } }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, cyclicBarrier)).start(); } } static class Task implements Runnable{ private int id; private CyclicBarrier cyclicBarrier; public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + id + "现在前往集合地点"); try { Thread.sleep((long) (Math.random()*10000)); System.out.println("线程"+id+"到了集合地点,开始等待其他人到达"); cyclicBarrier.await(); System.out.println("线程"+id+"出发了"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
Semaphore
import java.util.concurrent.Semaphore; public class ThreadTest { private static Semaphore semaphoreA = new Semaphore(1); private static Semaphore semaphoreB = new Semaphore(0); private static Semaphore semaphoreC = new Semaphore(0); private static Thread threadA, threadB, threadC; public static void main(String[] args) { threadA = new Thread(() -> { for (int i = 0; i < 10; i++) { try { semaphoreA.acquire(); System.out.print(Thread.currentThread().getName()); semaphoreB.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A"); threadB = new Thread(() -> { for (int i = 0; i < 10; i++) { try { semaphoreB.acquire(); System.out.print(Thread.currentThread().getName()); semaphoreC.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B"); threadC = new Thread(() -> { for (int i = 0; i < 10; i++) { try { semaphoreC.acquire(); System.out.print(Thread.currentThread().getName()); semaphoreA.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C"); threadA.start(); threadB.start(); threadC.start(); } } 运行结果:ABCABCABCABCABCABCABCABCABCABC
Condition
package flowcontrol.condition; import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 描述: 演示用Condition实现生产者消费者模式 */ public class ConditionDemo { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionDemo2 conditionDemo2 = new ConditionDemo2(); Producer producer = conditionDemo2.new Producer(); Consumer consumer = conditionDemo2.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("队列空,等待数据"); try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signalAll(); System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素"); } finally { lock.unlock(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { lock.lock(); try { while (queue.size() == queueSize) { System.out.println("队列满,等待有空余"); try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(1); notEmpty.signalAll(); System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size())); } finally { lock.unlock(); } } } } }
Exchanger
Phaser
...