探索 Rust 异步简化编程( 四 )


探索 Rust 异步简化编程

文章插图
 
通过生成的方式增加并发利用今天的异步Rust,应用程序可以通过利用select!或FutureUnordered生成新任务的方式增加并发 。到目前为止,我们讨论了任务生成和select! 。我建议去掉FuturesUnordered,因为它经常会导致bug 。在使用FutureUnordered时,很容易认为生成的任务会在后台执行,然后出乎意料地发现这些任务不会有任何进展 。
相反,我们可以利用带有作用域的任务实现类似的方案:
let greeting = "Hello".to_string;task::scope(async |scope| { let mut task_set = scope.task_set; for i in 0..10 { task_set.spawn(async { println!("{} from task {}", greeting, i); i }); } async for res in task_set { println!("task completed {:?}", res); }});每个生成的任务都会并发执行,从生成者那里借用数据,而TaskSet能提供一个类似于FuturesUnordered,但不会导致灾难的API 。至于缓存流等其他原语也可以在带有作用域的任务上实现 。
还可以在这些原语之上实现一些新的并发原语 。例如,可以实现类似于Kotlin的结构化并发 。之前有人曾讨论过这个问题(
https://github.com/tokio-rs/tokio/issues/1879),但异步Rust的当前模型无法实现这一点 。而将异步Rust改为保证完成,就能解锁这一领域 。
探索 Rust 异步简化编程

文章插图
 
select!怎么办?本文开头我说过,使用异步编程可以更有效地对复杂的流程控制进行建模 。目前最有效的原语为select! 。我还提议,将select!改为只接受类似于通道的类型,这样可以强制同学A为每个连接生成两个任务,实现读写的并发性 。生成任务能防止在取消读操作的时候出现bug,还能重写读操作,以处理意料之外的取消 。例如,mini-redis在解析帧的时候,我们首先将接收到的数据保存到缓冲区中 。当读操作被取消时,位于缓冲区中的数据不会丢失 。下次调用读操作会从中断的地方继续 。因此Mini-redis的读操作对于中止是安全的(abort-safe) 。
如果不将select!限制在类似于通道的类型上,而是将其限制在对于中止是安全的操作上,会怎样?从通道中接收数据是中止安全的,但从带有缓冲区的I/O处理函数中读取也是中止安全的 。这里的关键是,不应该假设所有异步操作都是中止安全的,而是应该要求开发者向函数定义中添加#[abort_safe](或async(abort)) 。这种策略有几个好处 。首先,当同学A学习异步Rust时,它不需要知道任何有关安全性的概念 。即使不理解这个概念,仅通过生成任务来获得并发性,也可以实现一切:
#[abort_safe]async fn read_line(&mut self) -> io::Result<Option<String>> { loop { // Consume a full line from the buffer if let Some(line) = self.parse_line? { return Ok(line); } // Not enough data has been buffered to parse a full line if 0 == self.socket.read_buf(&mut self.buffer)? { // The remote closed the connection. if self.buffer.is_empty { return Ok(None); } else { return Err("connection reset by peer".into); } } }}不再默认要求中止安全语句,而是由开发者自行标注 。这种自行标注的策略符合撤销安全性的模式 。当新的开发者阅读代码时,这个标注会告诉他们该函数必须保证中止安全 。rust编译器甚至可以对于标注了#[abort_safe]的函数提供额外的检查和警告 。
现在同学A可以在select!的循环中使用read_line了:
loop { select! { line_in = connection.read_line? => { if let Some(line_in) = line_in { broadcast_line(line_in); } else { // connection closed, exit loop break; } } line_out = channel.recv => { connection.write_line(line_out)?; } }}
探索 Rust 异步简化编程

文章插图
 
混合使用中止安全和非中止安全#[abort_safe]注释引入了两个异步语句的变种 。混合使用中止安全和非中止安全需要特别考虑 。不论从中止安全还是从非中止安全的上下文中,都可以调用一个中止安全的函数 。然而,Rust编译器会阻止从中止安全的上下文中调用非中止安全的函数,并提供一个有帮助的错误信息:


推荐阅读