final var barrier =new CyclicBarrier(N,()->{System.out.println("all runners ready");});复制代码
在并行任务中构建高效的缓存为了用简单的例子说明问题,我们在这里特别强调并行 ( Parallel ) 任务,这些任务的计算过程是纯粹 ( Pure ) 的 —— 这样的函数被称之纯函数 。无论它们何时被调用,被哪个线程调用,同样的输入永远得到同样的输出 。纯函数不和外部环境交互,因此自然也就不存在竞态条件 。
一个非常自然的想法是使用缓存 ( 或称记忆机制 Memorized ) 避免重复的运算 。在纯粹的映射关系中,固定的输入总是对应固定的输出,因此使用 K-V 键值对来记忆结果再好不过了 。我们基于 HashMap 给出最简单的一版实现,然后再探讨如何改进它们 。
class MapCacheV1 {private final HashMap<Integer,String> cache = new HashMap<>();public synchronized String getResult(Integer id){var v = cache.get(id);if (v == null){// 设定中,这个静态方法具有 500ms 左右的延迟 。v = PURE.slowOperation(id);cache.put(id,v);}return v;}}复制代码
尽管我们打算将 MapCache 用于无竞态条件的并行任务,但 getResult() 方法仍然加上了同步锁,因为 HashMap 本身不是线程安全的, cache 需要以安全的方式被并发访问 。然而,这种做法无疑会使得 getResult() 方法变得十分笨重,因为原本可以并行的慢操作 PURE.slowOperation() 也被锁在了代码块内部 。
最先想到的是使用更加高效的 ConcurrentHashMap 类取代线程不安全的 HashMap ,以获得免费的多线程性能提升:
class MapCacheV2 {private final ConcurrentHashMap<Integer,String> cache = new ConcurrentHashMap<>();public String getResult(Integer id){var v = cache.get(id);if(v == null){v = PURE.slowOperation(id);cache.put(id,v);}return v;}}复制代码
同时,我们这一次取消掉了 getResult() 上的同步锁 。这样,多线程可以并行地执行慢操作,只在修改 cache 时发生竞争 。但这个缓存仍有一些不足 —— 当某个线程 A 在计算新值时 ( 即这 500ms 之内 ),其它线程并不知道 。因此,多个线程有可能会计算同一个新值,甚至导致其它的计算任务无法进行 。
针对这个问题,我们再一次提出改进 。不妨让 cache 保存 "计算过程",而非值 。这样,工作线程将有三种行为:
- 缓存中没有此计算任务,注册并执行 。
- 缓存中有此计算任务,但未完毕,当前线程阻塞 ( 将 CPU 让给其它需要计算的线程 ) 。
- 缓存中有此计算任务,且已计算完毕,直接返回 。
class MapCacheV3 {private final ConcurrentHashMap<Integer,FutureTask<String>> cache = new ConcurrentHashMap<>();public String getResult(Integer id) throws ExecutionException, InterruptedException {// 获取一个计算任务,而非值final var task = cache.get(id);if(task == null){final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));// cache.putIfAbsent()cache.put(id,newTask);newTask.run();// 提交并执行任务 。return newTask.get();}else return task.get();}}复制代码
MapCacheV3 的实现已经近乎完美了 。唯一不足的是:我们对 cache 的操作仍然是 "先判断后执行" 的复合操作,但现在 getResult 并没有同步锁的保护 。两个线程仍然同时调用 cache.get() 并判空,并开始执行重复的计算 。下面的版本给出了最终的解决方案:使用 ConcurrentMap 的 putIfAbsent() 原子方法修复可能重复添加计算任务的问题 。
public String getResult(Integer id) throws ExecutionException, InterruptedException {// 获取一个计算任务,而非值final var task = cache.get(id);if(task == null){final var newTask = new FutureTask<>(()-> PURE.slowOperation(id));// put 和 putIfAbsent 方法均会返回此 Key 对应的上一个旧值 Value 。// 如果 put 的是一个新的 Key,则返回值为 null 。final var maybeNull = cache.putIfAbsent(id,newTask);if(maybeNull == null) newTask.run();return newTask.get();}else return task.get();}复制代码
值得注意的是,一旦 cache 存储的是计算任务而非值,那么就可能存在缓存污染的问题 。一旦某个 FutureTask 的计算被取消,或者失败,应当及时将它从缓存中移除以保证将来的计算成功,而不是放任其驻留在缓存内部返回失败的结果 。缓存思想几乎应用在各个地方 。比如在 Web 服务中,用户的数据往往不会总是直接来自数据库,而是 redis 这样的消息中间件 。在实际的应用环境下,还有更加复杂的问题需要被考虑到,比如缓存内容过时 ( expired ),或者是定期清理缓存空间等 。
推荐阅读
- 使用 JavaScript 实现无限滚动
- 如何自己开发漏洞扫描工具
- Firefox浏览器|火狐浏览器新增实用工具:将支持图片文字识别
- Java|Java:2022年招聘Java开发人员指南
- Java|HR傲慢对待求职者,还“诅咒”对方找不到工作,大学生也太难了
- 什么资源都能搜?这款搜索引擎工具,一键检索各大网盘资源
- 在Java 8及更高版本中使用Java流
- Java实现第三方短信接口发送短信验证码
- 3个超强资源搜索工具 资源搜索工具
- 连裤袜|“颜值经济”盛行,美妆工具受追捧,我国美妆工具市场需求增势明显