...
实现了一个异步任务框架,只需要实现 TaskState 和 Generate 这两个 trait 就可以异步执行并在错误的情况下可以重试。
trait TaskState
即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。
TaskResultCollector<Item>
即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:
代码块 |
---|
// 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
fn collect(&mut self, item: Item) -> Result<CollectorState>;
// 结束的时候调用
fn finish(self) -> Result<Self::Output>; |
实际上,整个异步框架了解以上两个类就足够,以下几个主要用于辅助以上两个类运作。
FutureTaskStream<S>
即 stream 类,我们在里面封装了 TaskState,stream.await 调用的时候,就会在 poll_next 里面调用 TaskState 的 new_sub_task 并返回 future 对象,最终异步执行 TaskState 的 future。每次执行完,都会调用 TaskState 的 next 函数,next 函数会根据当前 TaskState 的执行状态生成新的 TaskState 供 stream 执行。
trait
...
即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。
实际上,整个异步框架了解以上两个类就足够,以下两个主要用于辅助以上两个类运作。
trait Generator 和 struct TaskGenerator<S, C>
...
主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output> 创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)
UML
...
从精简的伪代码理解
代码块 | ||
---|---|---|
| ||
// 任务,其中的 future 返回 item impl TaskState for MyTaskState { // FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll // 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述) fnfn new_sub_task(&self) -> future { let fut = async { dodo_something_in_async(); // 业务代码,返回 item 业务代码 }; futfut } // 前一个结束,则后一个开始,返回新的 TaskState对象 fnfn next(self) -> Self { { match self.result { A_result => AnotherMyTaskStateA::new() B_result => AnotherMyTaskStateB::new() } } } } // 任务结果,处理每个 future 的结果 item impl TaskResultCollector<Item> for MyTaskStateOutputCollect { // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果 fn collect(&mut self, item: Item) -> Result<CollectorState> { self.storage.save(item); } // 结束的时候调用 fn finish(self) -> Result<Self::Output> { } let self.storage.flush(); } } // 以下是使用方法 let state = MyTaskState::new(); // 交给 TaskGenerator 跑异步流程 letlet generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state); generator.await; // 初始化1、初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState::new_sub_task 返回的 future // 2、所有 future 结束后,调用 TaskResultCollector 的 collect 存储结果 // 以上两步都封装好在 TaskGenerator 中异步执行 generator.await; |