版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output>  创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)

UML

...

从精简的伪代码理解

代码块
 impl TaskState for MyTaskState {
   //  FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll
   // 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述)
   fn new_sub_task(&self) -> future { 
      let fut = async {
         do_something_in_async(); // 业务代码
     };
     fut
   } 
   //  前一个结束,则后一个开始,返回新的 TaskState对象
   fn next(self) -> Self {
      match self.result {
        A_result => AnotherMyTaskStateA::new()
        B_result => AnotherMyTaskStateB::new()
     }
   }           
 }
 ​
 let state = MyTaskState::new();
 ​
 // 交给 TaskGenerator 跑异步流程
 let generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state);
 ​
 generator.await; // 初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState 返回的 future

...