版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

实现了一个异步任务框架,只需要实现 TaskState 和 Generate 这两个 trait 就可以异步执行并在错误的情况下可以重试。

trait TaskState

即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。

TaskResultCollector<Item>

即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:

代码块
 // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
 fn collect(&mut self, item: Item) -> Result<CollectorState>;
 ​
 // 结束的时候调用
 fn finish(self) -> Result<Self::Output>;

实际上,整个异步框架了解以上两个类就足够,以下几个主要用于辅助以上两个类运作。

FutureTaskStream<S>

即 stream 类,我们在里面封装了 TaskState,stream.await 调用的时候,就会在 poll_next 里面调用 TaskState 的 new_sub_task 并返回 future 对象,最终异步执行 TaskState 的 future。每次执行完,都会调用 TaskState 的 next 函数,next 函数会根据当前 TaskState 的执行状态生成新的 TaskState 供 stream 执行。

trait

...

即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。

实际上,整个异步框架了解以上两个类就足够,以下两个主要用于辅助以上两个类运作。

trait Generator 和 struct TaskGenerator<S, C>

...

主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output>  创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)

UML

...

从精简的伪代码理解

 impl
代码块
languagerust
 // 任务,其中的 future 返回 item
impl TaskState for MyTaskState {
    //  FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll
    // 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述)
    fnfn new_sub_task(&self) -> future { 
        let fut = async {
             dodo_something_in_async(); // 业务代码,返回 item
业务代码      };
       futfut
   }	
    //  前一个结束,则后一个开始,返回新的 TaskState对象
    fnfn next(self) -> Self	{
 {       match self.result {
           A_result => AnotherMyTaskStateA::new()
           B_result => AnotherMyTaskStateB::new()
     }
 
    }
  }						
}

// 任务结果,处理每个 future 的结果 item
impl TaskResultCollector<Item> for MyTaskStateOutputCollect {
  // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
  fn collect(&mut self, item: Item) -> Result<CollectorState> {
    self.storage.save(item);  
  }

	// 结束的时候调用
	fn finish(self) -> Result<Self::Output> {
  
 }
 ​
 let	self.storage.flush();
  }
}

// 以下是使用方法
let state = MyTaskState::new();
 ​
 // 交给 TaskGenerator 跑异步流程
 letlet generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state);
 ​
 generator.await;
// 初始化1、初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState::new_sub_task 返回的 future
// 2、所有 future 结束后,调用 TaskResultCollector 的 collect 存储结果
// 以上两步都封装好在 TaskGenerator 中异步执行
generator.await;