...
主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output> 创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)
UML
...
从精简的伪代码理解
代码块 |
---|
impl TaskState for MyTaskState { // FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll // 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述) fn new_sub_task(&self) -> future { let fut = async { do_something_in_async(); // 业务代码 }; fut } // 前一个结束,则后一个开始,返回新的 TaskState对象 fn next(self) -> Self { match self.result { A_result => AnotherMyTaskStateA::new() B_result => AnotherMyTaskStateB::new() } } } let state = MyTaskState::new(); // 交给 TaskGenerator 跑异步流程 let generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state); generator.await; // 初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState 返回的 future |
...