Java 同步工具与组合类的线程安全性分析( 五 )


InterruptedException同步工具类Java 还提供了诸如信号量 ( Semaphore ),栅栏 ( Barrier ),以及闭锁 ( Latch ) 作为同步工具类,它们都包含了一定的结构性属性:这些状态将决定执行同步工具类的线程是执行还是等待 。
闭锁闭锁是一种同步工具类,可以延迟线程的进度直到闭锁打开 。在此之前,所有的线程必须等待,而在闭锁结束之后,这个锁将永久保持打开状态 。这个特性适用于 需要确保某个任务的前序任务 ( 比如初始化 ) 全部完成之后才可以执行的场合,见下方的代码:Worker 线程等待另两个初始化线程准备就绪之后输出 p 的结果 。
// class Point{int x,y;}final var p = new Point();final var p_latch = new CountDownLatch(2);// Workernew Thread(()->{try {p_latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("Point(x=%d,y=%d)",p.x,p.y);}).start();// Init xnew Thread(()->{p.x = 1;p_latch.countDown();}).start();// Init ynew Thread(()->{p.y = 2;p_latch.countDown();}).start();复制代码FutureTask 也可以拿来做闭锁,它实现了 Future 的语义,表示一个抽象的可生成结果的计算,一般需要由线程池驱动执行,表示一个异步的任务 。
Runnable 接口表示无返回值的计算,Callable<T> 代表有返回值的计算 。
final var futurePoint = new FutureTask<>(()->new Point(1,2));new Thread(futurePoint).start();new Thread(()->{try {// 在 Callable 计算出结果之前阻塞var p = futurePoint.get();System.out.printf("Point(x=%d,y=%d)",p.x,p.y);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}).start();复制代码信号量计数信号量用于控制某个资源的同时访问数量,通常用于配置有容量限制的资源池,或称有界阻塞容器 。Semaphore 管理一组许可,线程在需要时首先获取许可,并在操作结束之后归还许可 。如果许可数量被耗尽,那么线程则必须要阻塞到其它任意线程归还许可 ( 默认情况下遵循 Non-Fair 策略 ) 为止 。特别地,当信号量的许可数为 1 时,则可认为是不可重入的互斥锁 。
下面是一个利用信号量 + 同步容器实现的简易阻塞队列:
class BoundedBlockingQueue<E>{final private List<E> list = Collections.synchronizedList(new LinkedList<>());final private Semaphore se;public BoundedBlockingQueue(int cap){se = new Semaphore(cap);}public void enqueue(E e) throws InterruptedException {se.acquire();list.add(0,e);}public E dequeue(){final var done = list.remove(0);se.release();return done;}@Overridepublic String toString() {return "BoundedBlockingQueue{" +"list=" + list +'}';}}复制代码栅栏栅栏 ( Barrier ) 类似于闭锁,同样都会阻塞到某一个事件发生 。闭锁强调等待某个事件发生之后再执行动作,而栅栏更强调在某个事件发生之前等待其它线程 。它可用于实现一些协议:"所有人在指定的时间去会议室碰头,等到所有的人到齐之后再开会",比如数据库事务的两阶段提交 。
Java 提供了一个名为 CyclicBarrier 的栅栏,它指定了 N 个工作线程 反复地 在栅栏位置汇集 。在某线程执行完毕之后,调用 await() 方法阻塞自身,以等待其它更慢的线程到达栅栏位置 。当设定的 N 个线程均调用 await() 之后,栅栏将打开,此时所有的线程将可以继续向下执行代码,而栅栏本身的状态会重置,以便复用 ( 因而命名为 Cyclic- ) 。
见下面的代码,4 个线程并行执行初始化工作 ( 以随机时间的 sleep 模拟延迟 ),并等待所有线程初始化完毕之后同时打印信息 。
final int N = 4;final var barrier =new CyclicBarrier(N);final Thread[] workers = new Thread[N];for(var i : new Integer[]{0,1,2,3}){var t = new Thread(()->{try {// 模拟随机的延时var rdm = new Random().nextInt(1000);Thread.sleep(rdm);// 在所有其它线程到达之前阻塞barrier.await();// 所有线程到达之后执行,每个线程打印延时时间System.out.printf("prepare for %d millisn",rdm);} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}});workers[i] = t;t.start();}// 等待所有的任务并行执行完毕 。for(var worker : workers){worker.join();}复制代码在不涉及 IO 操作和数据共享的计算问题当中,线程数量为 N CPU 或者 N CPU + 1 时会获得最优的吞吐量,更多的线程也不会带来带来帮助,甚至性能还会下降,因为 CPU 需要频繁的切换上下文 。
一旦线程成功地到达栅栏,则 await() 方法会其标记为 "子线程" 。 CyclicBarrier 的构造器还接受额外的 Runnable 接口做回调函数,当所有线程全部到达栅栏之后, CyclicBarrier 会从子线程当中挑选出一个领导线程去执行它 ( 即,每一轮通过栅栏之后,它都会被执行且仅一次 ),我们可以在此实现日志记录等操作 。


推荐阅读