Java新的结构化并行模式入门指南( 二 )


Got a Planet: {"name":"Alderaan"}
BEGIN getPlanet()
Got a Planet: {"name":"Yavin”}
BEGIN getPlanet()
Got a Planet: {"name":"Hoth"}
BEGIN getPlanet()
Got a Planet: {"name":"Dagobah"}
现在不妨使用结构化并发尝试同一个示例 。如代码片段2所示 , 结构化并发允许我们将调用分解成并发请求,并将所有内容放在相同的代码空间中 。在代码片段2中,我们添加了必要的StructuredTaskScope导入,然后使用其核心方法fork()和join(),将每个请求分解成各自的线程,然后等待它们全部完成 。
代码片段2. 使用StructuredTaskScopeNow的多API调用
复制
package com.infoworld;
import java.util.concurrent.*;
import java.util.concurrent.StructuredTaskScope.*;
//...
public class App {
public String getPlanet(int planetId) throws Exception {
// ... same ...
}
void sync() throws Exception {
int[] planetIds = {1,2,3,4,5};
for (int planetId : planetIds) {
getPlanet(planetId);
}
}
void sc() throws Exception {
int[] planetIds = {1,2,3,4,5};
try (var scope = new StructuredTaskScope<Object>()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
// ...
System.out.println("nr-- BEGIN Structured Concurrency");
try {
myApp.sc();
} catch (Exception e){
System.err.println("Error: " + e);
}
}
}
如果我们运行代码片段2,将得到类似的输出 , 但速度要快不少 , 这是由于请求是同时发出、并发进行的 。不妨考虑sc()方法(使用多线程)与sync()方法(使用同步代码)之间的区别 。结构化并发方法没有想象的那么难,提供结果的速度却快得多 。
处理任务和子任务默认情况下,StructuredTaskScope被创建时,它使用虚拟线程,所以我们实际上并没有在这里配置操作系统线程;相反,我们告诉JVM以最有效的方式编排请求 。(StructuredTaskScope的构造函数也接受ThreadFactory 。)
在代码片段2中,我们在try-with-resource块中创建StructuredTaskScope对象,这是它原本的使用方式 。我们可以使用fork()创建任意数量的作业 。fork()方法接受任何实现Callable的程序 , 也就是说 , 任何方法或函数 。这里,我们将getPlanet()方法包装在一个匿名函数中:()-> getPlanet(planetId)——这是一种向目标函数传递参数的实用语法 。
当我们调用join()时 , 我们告诉作用域等待所有被分叉的作业 。实质上,join()将我们带回到同步模式 。分叉的作业将按照TaskScope的配置进行处理 。
关闭任务作用域
由于我们在try-with-resource块中创建了TaskScope , 因此当该块结束时 , 作用域将自动关闭 。这为作用域调用shutdown()进程 , 作用域可以定制 , 以便根据需要来处理运行中线程的处置 。如果需要在作用域关闭之前关闭它,也可以手动调用shutdown()方法 。
StructuredTaskScope包括两个实现内置关闭策略的类:ShutDownOnSuccess和ShutDownOnFailure 。这些类监视成功或出错的子任务,然后取消其余运行中的线程 。使用目前的设置,我们可以这样使用这些类:
代码片段3. 内置关闭策略
复制
void failFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2,3,-1,4};
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
}
}
void succeedFast() throws ExecutionException, InterruptedException {
int[] planetIds = {1,2};
try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
for (int planetId : planetIds) {
scope.fork(() -> getPlanet(planetId));
}
scope.join();
} catch (Exception e){
System.out.println("Error: " + e);
}
}
public static void main(String[] args) {
var myApp = new App();
System.out.println("nr-- BEGIN succeedFast");
try {
myApp. succeedFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("nr-- BEGIN failFast");
try {
myApp.failFast();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}


推荐阅读