JDK并发包详细概括
发布时间:2021-11-12 17:08:06 所属栏目:教程 来源:互联网
导读:本文主要介绍jdk中常用的同步控制工具以及并发容器。 同步控制工具类 ReentrantLock可重入锁 Condition Semaphore信号量 ReadWriteLock读写分离锁 CountDownLatch倒数计时器 CyclicBarrier循环栅栏 LockSupport阻塞线程 并发容器 Collections.synchronizedMa
本文主要介绍jdk中常用的同步控制工具以及并发容器。 同步控制工具类 ReentrantLock可重入锁 Condition Semaphore信号量 ReadWriteLock读写分离锁 CountDownLatch倒数计时器 CyclicBarrier循环栅栏 LockSupport阻塞线程 并发容器 Collections.synchronizedMap ConcurrentHashMap BlockingQueue CopyOnWriteArrayList 同步控制工具类 ReentrantLock 简而言之, 就是自由度更高的synchronized, 主要具备以下优点. 可重入: 单线程可以重复进入,但要重复退出 可中断: lock.lockInterruptibly() 可限时: 超时不能获得锁,就返回false,不会永久等待构成死锁 公平锁: 先来先得, public ReentrantLock(boolean fair), 默认锁不公平的, 根据线程优先级竞争. 示例 public class ReenterLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 10000; j++) { lock.lock(); // 超时设置 // lock.tryLock(5, TimeUnit.SECONDS); try { i++; } finally { // 需要放在finally里释放, 如果上面lock了两次, 这边也要unlock两次 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock tl = new ReenterLock(); Thread t1 = new Thread(tl); Thread t2 = new Thread(tl); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(i); } } 中断死锁 线程1, 线程2分别去获取lock1, lock2, 触发死锁. 最终通过DeadlockChecker来触发线程中断. public class DeadLock implements Runnable{ public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public DeadLock(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1){ lock1.lockInterruptibly(); try { Thread.sleep(500); }catch (InterruptedException e){} lock2.lockInterruptibly(); }else { lock2.lockInterruptibly(); try { Thread.sleep(500); }catch (InterruptedException e){} lock1.lockInterruptibly(); } }catch (InterruptedException e){ e.printStackTrace(); }finally { if (lock1.isHeldByCurrentThread()) lock1.unlock(); if (lock2.isHeldByCurrentThread()) lock2.unlock(); System.out.println(Thread.currentThread().getId() + "线程中断"); } } public static void main(String[] args) throws InterruptedException { DeadLock deadLock1 = new DeadLock(1); DeadLock deadLock2 = new DeadLock(2); // 线程1, 线程2分别去获取lock1, lock2. 导致死锁 Thread t1 = new Thread(deadLock1); Thread t2 = new Thread(deadLock2); t1.start(); t2.start(); Thread.sleep(1000); // 死锁检查, 触发中断 DeadlockChecker.check(); } } Condition 类似于 Object.wait()和Object.notify(), 需要与ReentrantLock结合使用. 具体API如下: // await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时, // 线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。 void await() throws InterruptedException; // awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; // singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。 // 这和Obejct.notify()方法很类似。 void signal(); void signalAll(); 示例 public class ReenterLockCondition implements Runnable{ public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 注意放到finally中释放 lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReenterLockCondition t1 = new ReenterLockCondition(); Thread tt = new Thread(t1); tt.start(); Thread.sleep(2000); System.out.println("after sleep, signal!"); // 通知线程tt继续执行. 唤醒同样需要重新获得锁 lock.lock(); condition.signal(); lock.unlock(); } } Semaphore信号量 锁一般都是互斥排他的, 而信号量可以认为是一个共享锁, 允许N个线程同时进入临界区, 但是超出许可范围的只能等待. 如果N = 1, 则类似于lock. 具体API如下, 通过acquire获取信号量, 通过release释放 1 public void acquire() 2 public void acquireUninterruptibly() 3 public boolean tryAcquire() 4 public boolean tryAcquire(long timeout, TimeUnit unit) 5 public void release() 示例 模拟20个线程, 但是信号量只设置了5个许可. 因此线程是按序每2秒5个的打印job done. public class SemapDemo implements Runnable{ // 设置5个许可 final Semaphore semp = new Semaphore(5); @Override public void run() { try { semp.acquire(); // 模拟线程耗时操作 Thread.sleep(2000L); System.out.println("Job done! " + Thread.currentThread().getId()); } catch (InterruptedException e) { e.printStackTrace(); } finally { semp.release(); } } public static void main(String[] args){ ExecutorService service = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for (int i = 0; i < 20; i++) { service.submit(demo); } } } ReadWriteLock 读写分离锁, 可以大幅提升系统并行度. 读-读不互斥:读读之间不阻塞。 读-写互斥:读阻塞写,写也会阻塞读。 写-写互斥:写写阻塞。 示例 使用方法与ReentrantLock类似, 只是读写锁分离. 1 private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); 2 private static Lock readLock = readWriteLock.readLock(); 3 private static Lock writeLock = readWriteLock.writeLock(); CountDownLatch倒数计时器 一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程, 等待所有检查线程全部完工后,再执行. 示例 public class CountDownLatchDemo implements Runnable{ static final CountDownLatch end = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try { Thread.sleep(new Random().nextInt(10) * 1000); System.out.println("check complete!"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { service.submit(demo); } // 等待检查 end.await(); // 所有线程检查完毕, 发射火箭. System.out.println("fire"); service.shutdown(); } } CyclicBarrier循环栅栏 Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐 第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程. 示例 public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldier) { this.cyclic = cyclic; this.soldier = soldier; } @Override public void run() { try { // 等待所有士兵到期 cyclic.await(); doWork(); // 等待所有士兵完成工作 cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt() % 10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + " 任务完成!"); } } public static class BarrierRun implements Runnable { boolean flag; int N; public BarrierRun(boolean flag, int n) { this.flag = flag; N = n; } @Override public void run() { if (flag) { System.out.println("士兵:" + N + "个, 任务完成!"); } else { System.out.println("士兵:" + N + "个, 集合完毕!"); flag = true; } } } public static void main(String[] args){ final int N = 5; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); // 设置屏障点, 主要为了执行这个方法. System.out.println("集合任务!"); for (int i = 0; i < N; i++) { System.out.println("士兵" + i + " 报到!"); allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i)); allSoldier[i].start(); } } } 结果 集合任务! 士兵0 报到! 士兵1 报到! 士兵2 报到! 士兵3 报到! 士兵4 报到! 士兵:5个, 集合完毕! 士兵3 任务完成! 士兵1 任务完成! 士兵0 任务完成! 士兵4 任务完成! 士兵2 任务完成! 士兵:5个, 任务完成! LockSupport 一个线程阻塞工具, 可以在任意位置让线程阻塞. 与suspend()比较, 如果unpark发生在park之前, 并不会导致线程冻结, 也不需要获取锁. API 1 LockSupport.park(); 2 LockSupport.unpark(t1); 中断响应 能够响应中断,但不抛出异常。 中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志 public class LockSupportDemo { public static Object u = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread { public ChangeObjectThread(String name) { super(name); } @Override public void run() { synchronized (u) { System.out.println("in " + getName()); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); } } 并发容器 Collections.synchronizedMap 其本质是在读写map操作上都加了锁, 因此不推荐在高并发场景使用. ConcurrentHashMap 支持高并发的HashMap. 通过将一个大的hashmap切割成无数个小的分区hashmap(Segment<K,V>). 执行put的时候把key映射到其中一个小的分区中, 假如有十几个线程, 那么可能就会对应十几个分区. public V put(K key, V value) { ConcurrentHashMap.Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; // 通过unsafe对j进行偏移来寻找key所对应的分区 if ((s = (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 如果分区不存在, 则创建新的分区 s = ensureSegment(j); // kv放到分区中 return s.put(key, hash, value, false); } Segment.put源码 Segment(float lf, int threshold, ConcurrentHashMap.HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { // tryLock通过无锁cas操作尝试获取锁(无等待), 继承自ReentrantLock. // 如果成功则, node = null // 如果不成功, 则可能其他线程已经在插入数据了, // 此时会尝试继续获取锁tryLock, 自旋MAX_SCAN_RETRIES次, 若还是拿不到锁才开始lock ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { ConcurrentHashMap.HashEntry<K,V>[] tab = table; // 获取分区中哪一个entry链的index int index = (tab.length - 1) & hash; // 获取第一个entry ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index); for (ConcurrentHashMap.HashEntry<K,V> e = first;;) { // e != null , 存在hash冲突, 把他加到当前链表中 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // 无hash冲突, new entry if (node != null) node.setNext(first); else node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 空间大小超出阈值, 需要rehash, 翻倍空间. if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //放到分区中 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } BlockingQueue 阻塞队列, 主要用于多线程之间共享数据. 当一个线程读取数据时, 如果队列是空的, 则当前线程会进入等待状态. 如果队列满了, 当一个线程尝试写入数据时, 同样会进入等待状态. 适用于生产消费者模型. 其源码实现也相对简单. public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列满了, 写进入等待 while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列空的, 读进入等待 while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } 因为BlockingQueue在put take等操作有锁, 因此非高性能容器, 如果需要高并发支持的队列, 则可以使用ConcurrentLinkedQueue. 他内部也是运用了大量无锁操作. CopyOnWriteArrayList CopyOnWriteArrayList通过在新增元素时, 复制一份新的数组出来, 并在其中写入数据, 之后将原数组引用指向到新数组. 其Add操作是在内部通过ReentrantLock进行锁保护, 防止多线程场景复制多份数组. 而Read操作内部无锁, 直接返回数组引用, 并发下效率高, 因此适用于读多写少的场景. 源码 public boolean add(E e) { final ReentrantLock lock = this.lock; // 写数据的锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // 复制到新的数组 Object[] newElements = Arrays.copyOf(elements, len + 1); // 加入新元素 newElements[len] = e; // 修改引用 setArray(newElements); return true; } finally { lock.unlock(); } } final void setArray(Object[] a) { array = a; } // 读的时候无锁 public E get(int index) { return get(getArray(), index); } 示例 使用10个读线程, 100个写线程. 如果使用ArrayList实现, 那么有可能是在运行过程中抛出ConcurrentModificationException. 原因很简单, ArrayList在遍历的时候会check modCount是否发生变化, 如果一边读一边写就会抛异常. public class CopyOnWriteListDemo { static List<UUID> list = new CopyOnWriteArrayList<UUID>(); // static List<UUID> list = new ArrayList<UUID>(); // 往list中写数据 public static class AddThread implements Runnable { @Override public void run() { UUID uuid = UUID.randomUUID(); list.add(uuid); System.out.println("++Add uuid : " + uuid); } } // 从list中读数据 public static class ReadThread implements Runnable { @Override public void run() { System.out.println("start read size: " + list.size() + " thread : " + Thread.currentThread().getName()); for (UUID uuid : list) { System.out.println("Read uuid : " + uuid + " size : " + list.size() + "thread: " + Thread.currentThread().getName()); } } } public static void main(String[] args) throws InterruptedException { initThread(new AddThread(), 10); initThread(new ReadThread(), 100); } private static void initThread(Runnable runnable, int maxNum) throws InterruptedException { Thread[] ts = new Thread[maxNum]; for (int k = 0; k < maxNum; k++) { ts[k] = new Thread(runnable); } for (int k = 0; k < maxNum; k++) { ts[k].start(); } } } ![]() (编辑:三明站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐