加入收藏 | 设为首页 | 会员中心 | 我要投稿 三明站长网 (https://www.0598zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

JDK并发包详细概括

发布时间:2021-11-12 17:08:06 所属栏目:教程 来源:互联网
导读:本文主要介绍jdk中常用的同步控制工具以及并发容器。 同步控制工具类 ReentrantLock可重入锁 Condition Semaphore信号量 ReadWriteLock读写分离锁 CountDownLatch倒数计时器 CyclicBarrier循环栅栏 LockSupport阻塞线程 并发容器 Collections.synchronizedMa

本文主要介绍jdk中常用的同步控制工具以及并发容器。
 
同步控制工具类
ReentrantLock可重入锁
Condition
Semaphore信号量
ReadWriteLock读写分离锁
CountDownLatch倒数计时器
CyclicBarrier循环栅栏
LockSupport阻塞线程
并发容器
Collections.synchronizedMap
ConcurrentHashMap
BlockingQueue
CopyOnWriteArrayList
同步控制工具类
ReentrantLock
简而言之, 就是自由度更高的synchronized, 主要具备以下优点.
 
可重入: 单线程可以重复进入,但要重复退出
可中断: lock.lockInterruptibly()
可限时: 超时不能获得锁,就返回false,不会永久等待构成死锁
公平锁: 先来先得, public ReentrantLock(boolean fair), 默认锁不公平的, 根据线程优先级竞争.
示例
 
public class ReenterLock implements Runnable {
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
 
    @Override
    public void run() {
        for (int j = 0; j < 10000; j++) {
            lock.lock();
            // 超时设置
//            lock.tryLock(5, TimeUnit.SECONDS);
            try {
                i++;
            } finally {
                // 需要放在finally里释放, 如果上面lock了两次, 这边也要unlock两次
                lock.unlock();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        ReenterLock tl = new ReenterLock();
        Thread t1 = new Thread(tl);
        Thread t2 = new Thread(tl);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}
 
中断死锁
 
线程1, 线程2分别去获取lock1, lock2, 触发死锁. 最终通过DeadlockChecker来触发线程中断.
 
public class DeadLock implements Runnable{
 
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
 
    public DeadLock(int lock) {
        this.lock = lock;
    }
 
    @Override
    public void run() {
        try {
            if (lock == 1){
                lock1.lockInterruptibly();
                try {
                    Thread.sleep(500);
                }catch (InterruptedException e){}
                lock2.lockInterruptibly();
 
            }else {
                lock2.lockInterruptibly();
                try {
                    Thread.sleep(500);
                }catch (InterruptedException e){}
                lock1.lockInterruptibly();
 
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            if (lock1.isHeldByCurrentThread())
                lock1.unlock();
            if (lock2.isHeldByCurrentThread())
                lock2.unlock();
            System.out.println(Thread.currentThread().getId() + "线程中断");
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        DeadLock deadLock1 = new DeadLock(1);
        DeadLock deadLock2 = new DeadLock(2);
        // 线程1, 线程2分别去获取lock1, lock2. 导致死锁
        Thread t1 = new Thread(deadLock1);
        Thread t2 = new Thread(deadLock2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        // 死锁检查, 触发中断
        DeadlockChecker.check();
 
    }
}
 
Condition
类似于 Object.wait()和Object.notify(), 需要与ReentrantLock结合使用.
 
具体API如下:
 
// await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,
    // 线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
    void await() throws InterruptedException;
    // awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    // singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。
    // 这和Obejct.notify()方法很类似。
    void signal();
    void signalAll();
 
示例
 
public class ReenterLockCondition implements Runnable{
 
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();
 
    @Override
    public void run() {
        try {
            lock.lock();
            condition.await();
            System.out.println("Thread is going on");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 注意放到finally中释放
            lock.unlock();
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        ReenterLockCondition t1 = new ReenterLockCondition();
        Thread tt = new Thread(t1);
        tt.start();
        Thread.sleep(2000);
        System.out.println("after sleep, signal!");
        // 通知线程tt继续执行. 唤醒同样需要重新获得锁
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}
 
Semaphore信号量
 
锁一般都是互斥排他的, 而信号量可以认为是一个共享锁,
 
允许N个线程同时进入临界区, 但是超出许可范围的只能等待.
 
如果N = 1, 则类似于lock.
 
具体API如下, 通过acquire获取信号量, 通过release释放
 
1    public void acquire()
2    public void acquireUninterruptibly()
3    public boolean tryAcquire()
4    public boolean tryAcquire(long timeout, TimeUnit unit)
5    public void release()
 
示例
 
模拟20个线程, 但是信号量只设置了5个许可.
 
因此线程是按序每2秒5个的打印job done.
 
public class SemapDemo implements Runnable{
 
    // 设置5个许可
    final Semaphore semp = new Semaphore(5);
 
    @Override
    public void run() {
        try {
            semp.acquire();
            // 模拟线程耗时操作
            Thread.sleep(2000L);
            System.out.println("Job done! " + Thread.currentThread().getId());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semp.release();
        }
    }
 
    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            service.submit(demo);
        }
    }
}
 
ReadWriteLock
读写分离锁, 可以大幅提升系统并行度.
 
读-读不互斥:读读之间不阻塞。
读-写互斥:读阻塞写,写也会阻塞读。
写-写互斥:写写阻塞。
示例
 
使用方法与ReentrantLock类似, 只是读写锁分离.
 
1 private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
2 private static Lock readLock = readWriteLock.readLock();
3 private static Lock writeLock = readWriteLock.writeLock();
 
CountDownLatch倒数计时器
一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。
 
只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程,
 
等待所有检查线程全部完工后,再执行.
 
 
 
示例
 
public class CountDownLatchDemo implements Runnable{
    static final CountDownLatch end = new CountDownLatch(10);
    static final CountDownLatchDemo demo = new CountDownLatchDemo();
 
    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("check complete!");
            end.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            service.submit(demo);
        }
        // 等待检查
        end.await();
        // 所有线程检查完毕, 发射火箭.
        System.out.println("fire");
        service.shutdown();
    }
}
 
CyclicBarrier循环栅栏
Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐
 
第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程.
 
 
 
示例
 
public class CyclicBarrierDemo {
 
    public static class Soldier implements Runnable {
 
        private String soldier;
        private final CyclicBarrier cyclic;
 
        Soldier(CyclicBarrier cyclic, String soldier) {
            this.cyclic = cyclic;
            this.soldier = soldier;
        }
 
        @Override
        public void run() {
            try {
                // 等待所有士兵到期
                cyclic.await();
                doWork();
                // 等待所有士兵完成工作
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
 
        void doWork() {
            try {
                Thread.sleep(Math.abs(new Random().nextInt() % 10000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldier + " 任务完成!");
        }
    }
 
    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;
 
        public BarrierRun(boolean flag, int n) {
            this.flag = flag;
            N = n;
        }
 
        @Override
        public void run() {
            if (flag) {
                System.out.println("士兵:" + N + "个, 任务完成!");
            } else {
                System.out.println("士兵:" + N + "个, 集合完毕!");
                flag = true;
            }
        }
    }
 
    public static void main(String[] args){
        final int N = 5;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        // 设置屏障点, 主要为了执行这个方法.
        System.out.println("集合任务!");
        for (int i = 0; i < N; i++) {
            System.out.println("士兵" + i + " 报到!");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
            allSoldier[i].start();
        }
 
    }
}
 
结果
 
集合任务!
士兵0 报到!
士兵1 报到!
士兵2 报到!
士兵3 报到!
士兵4 报到!
士兵:5个, 集合完毕!
士兵3 任务完成!
士兵1 任务完成!
士兵0 任务完成!
士兵4 任务完成!
士兵2 任务完成!
士兵:5个, 任务完成!
 
LockSupport
一个线程阻塞工具, 可以在任意位置让线程阻塞.
 
与suspend()比较, 如果unpark发生在park之前, 并不会导致线程冻结, 也不需要获取锁.
 
API
 
1 LockSupport.park();
2 LockSupport.unpark(t1);
 
中断响应
 
能够响应中断,但不抛出异常。
 
中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志
 
public class LockSupportDemo {
    public static Object u = new Object();
    static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    public static class ChangeObjectThread extends Thread {
 
        public ChangeObjectThread(String name) {
            super(name);
        }
 
        @Override
        public void run() {
            synchronized (u) {
                System.out.println("in " + getName());
                LockSupport.park();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        t1.start();
        Thread.sleep(100);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}
 
并发容器
Collections.synchronizedMap
其本质是在读写map操作上都加了锁, 因此不推荐在高并发场景使用.
 
 
 
ConcurrentHashMap
支持高并发的HashMap. 通过将一个大的hashmap切割成无数个小的分区hashmap(Segment<K,V>).
 
执行put的时候把key映射到其中一个小的分区中, 假如有十几个线程, 那么可能就会对应十几个分区.
 
public V put(K key, V value) {
        ConcurrentHashMap.Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        // 通过unsafe对j进行偏移来寻找key所对应的分区
        if ((s = (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            // 如果分区不存在, 则创建新的分区
            s = ensureSegment(j);
        // kv放到分区中
        return s.put(key, hash, value, false);
    }
 
Segment.put源码
 
 
 
Segment(float lf, int threshold, ConcurrentHashMap.HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
 
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // tryLock通过无锁cas操作尝试获取锁(无等待), 继承自ReentrantLock.
        // 如果成功则, node = null
        // 如果不成功, 则可能其他线程已经在插入数据了,
        // 此时会尝试继续获取锁tryLock, 自旋MAX_SCAN_RETRIES次, 若还是拿不到锁才开始lock
        ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            ConcurrentHashMap.HashEntry<K,V>[] tab = table;
            // 获取分区中哪一个entry链的index
            int index = (tab.length - 1) & hash;
            // 获取第一个entry
            ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
            for (ConcurrentHashMap.HashEntry<K,V> e = first;;) {
                // e != null , 存在hash冲突, 把他加到当前链表中
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    // 无hash冲突, new entry
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    // 空间大小超出阈值, 需要rehash, 翻倍空间.
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        //放到分区中
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
 
BlockingQueue
阻塞队列, 主要用于多线程之间共享数据.
 
当一个线程读取数据时, 如果队列是空的, 则当前线程会进入等待状态.
 
如果队列满了, 当一个线程尝试写入数据时, 同样会进入等待状态.
 
适用于生产消费者模型.
 
 
 
其源码实现也相对简单.
 
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列满了, 写进入等待
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
 
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 队列空的, 读进入等待
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
 
因为BlockingQueue在put take等操作有锁, 因此非高性能容器,
 
如果需要高并发支持的队列, 则可以使用ConcurrentLinkedQueue. 他内部也是运用了大量无锁操作.
 
CopyOnWriteArrayList
CopyOnWriteArrayList通过在新增元素时, 复制一份新的数组出来, 并在其中写入数据, 之后将原数组引用指向到新数组.
 
其Add操作是在内部通过ReentrantLock进行锁保护, 防止多线程场景复制多份数组.
 
而Read操作内部无锁, 直接返回数组引用, 并发下效率高, 因此适用于读多写少的场景.
 
源码
 
public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        // 写数据的锁
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 复制到新的数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            // 加入新元素
            newElements[len] = e;
            // 修改引用
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
 
    final void setArray(Object[] a) {
        array = a;
    }
 
    // 读的时候无锁
    public E get(int index) {
        return get(getArray(), index);
    }
 
示例
 
使用10个读线程, 100个写线程. 如果使用ArrayList实现, 那么有可能是在运行过程中抛出ConcurrentModificationException.
 
原因很简单, ArrayList在遍历的时候会check modCount是否发生变化, 如果一边读一边写就会抛异常.
 
public class CopyOnWriteListDemo {
 
    static List<UUID> list = new CopyOnWriteArrayList<UUID>();
//    static List<UUID> list = new ArrayList<UUID>();
 
    // 往list中写数据
    public static class AddThread implements Runnable {
 
        @Override
        public void run() {
            UUID uuid = UUID.randomUUID();
            list.add(uuid);
            System.out.println("++Add uuid : " + uuid);
 
        }
    }
 
    // 从list中读数据
    public static class ReadThread implements Runnable {
 
        @Override
        public void run() {
            System.out.println("start read size: " + list.size() + " thread : " + Thread.currentThread().getName());
            for (UUID uuid : list) {
                System.out.println("Read uuid : " + uuid + " size : " + list.size() + "thread: " + Thread.currentThread().getName());
            }
        }
    }
 
 
    public static void main(String[] args) throws InterruptedException {
        initThread(new AddThread(), 10);
        initThread(new ReadThread(), 100);
    }
 
    private static void initThread(Runnable runnable, int maxNum) throws InterruptedException {
        Thread[] ts = new Thread[maxNum];
        for (int k = 0; k < maxNum; k++) {
            ts[k] = new Thread(runnable);
        }
        for (int k = 0; k < maxNum; k++) {
            ts[k].start();
        }
    }
}

(编辑:三明站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!