版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

目录

基于 actix

官方的 actix

首先应该先根据官方的文档,熟悉基本的 actix 框架,包括:actor,address,contex,arbiter 和 sync arbiter 这些基本概念。

...

其 impl 了官方的 Actor 和 Handle trait,也就是所有的异步消息都会走 ServiceActor<S>,其中 S 是我们应用层真正实现业务的类,例如,如果我们想新建一个 XXX service,那么只需要:

代码块
languagerust
 // 定义service 和 创建工厂类
 // impl ActorService trait,这样才可以给到 ServiceActor 中的 proxy 控制
 // impl EventHandler<S, E> 和 ServiceHandler<S, R> 处理消息
 struct XXXService; 
 ​
 // 实现 ServiceFactory<S>::create 函数,生成 serivce
 struct XXXServiceFactory; 
 ​
 // 使用,在 init_system() 函数中,通过 registry 对象进行注册启动 service
 registry.register_by_factory::<XXXService, XXXServiceFactory>().await?; // 此处会开启一个线程处理XXX Service 感兴趣的消息

...

stream 即流,类似比较熟悉的概念即 iterator,即 iterator 有的函数 stream 也有(比如,map,collect 等),也就是说 stream 一种集合,如果我们看 stream 的定义,会发现有一个和 next(iterator 的函数) 和 poll(future 的函数) 类似的函数:

代码块
languagerust
  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

如果我们调用了stream 的 await,那么它的 poll_next 就会被触发,其返回也是 Ready 或者 Penndig,关联集合中的某一项(Item)。当所有的集合元素都被 poll_next 一遍后,就返回 Ready(None)。相比之前 for 循环同步遍历 iterator,stream 这些动作都是异步执行,如下(伪代码):

代码块
languagerust
 let s = stream::new();
 while let some(item) = s.await { // await 触发 next_poll 调用,有 item 则返回,没 item 则挂起等待
   println!(item);
 }

...

简单说,就是相比单个 future, stream 实现了批量异步执行子任务。如下(伪代码):

代码块
languagerust
 let s = stream::new();
 while let some(future) = s.await { // await 触发 next_poll 调用,有 future 则返回,没 future 则挂起等待
   let result = future.await;  //  触发 poll,若返回 pending,则挂起
   println!(result);
 }

(待补充图)Image Added

iter 函数 和 buffered 函数

...

1、stream 的 item 不是 future 对象,使用 iter,iter 函数返回 Collect 对象;它帮我们实现了异步 await item 的流程,关键源代码如下:

代码块
languagerust
 impl<St, C> Future for Collect<St, C>
 where
     St: Stream,
     C: Default + Extend<St::Item>,
 {
     type Output = C;
 ​
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
         let mut this = self.as_mut().project();
         loop {
             match ready!(this.stream.as_mut().poll_next(cx)) {  // 调用了 stream 的 poll_next,异步拉取 item
                 Some(e) => this.collection.extend(Some(e)),
                 None => return Poll::Ready(self.finish()),
             }
         }
     }
 }

2、stream 的 item 是future 对象,使用 buffered,buffered 函数返回 Buffered 对象;它帮我们实现了启动 future await 的流程:

代码块
languagerust
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let mut this = self.project();
 ​
         // First up, try to spawn off as many futures as possible by filling up
         // our queue of futures.
         while this.in_progress_queue.len() < *this.max {
             match this.stream.as_mut().poll_next(cx) {
                 // 已经 ready 有对象实体的,进入执行队列
                 Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), 
                 Poll::Ready(None) | Poll::Pending => break,
             }
         }
 ​
         // 拉起执行队列中的 future 
         // Attempt to pull the next value from the in_progress_queue
         let res = this.in_progress_queue.poll_next_unpin(cx);
         if let Some(val) = ready!(res) {
             return Poll::Ready(Some(val));
         }
 ​
         // If more values are still coming from the stream, we're not done yet
         if this.stream.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
         }
     }

...

stream 的 iter 和 buffer 函数返回 future 异步的去遍历 item,但其返回是所有的 item 集合,是一次性的,也就是 stream 没有给我们展现中间过程的异步处理。如果我们想每处理一个 item 就做一些事情,就需要 sink trait。sink 可以让我们监控整个 stream 内部遍历 item 的动作,翻看 sink 的 trait 声明会看到:

代码块
languagerust
 pub trait Sink<Item> {
     /// The type of value produced by the sink when an error occurs.
     type Error;
 ​
     // start_send 之前调用
     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
     
     // 处理某个 item 之前调用
     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
 ​
     // 所有 item 都过一遍后调用,若还有 item 处于挂起,则返回 pending,否则若执行返回 ready
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
 ​
     // 结束前调用
     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
 }

有了 sink,就可以监控对 item 的处理,例如(伪代码):

代码块
languagerust
 let s = stream::new();
 ​
 // stream 转为 map<stream, future>,这里的 future 即在 sink 中异步去调用 stream await
 let buffered = s.buffered().map(ok).collect(); 
 let sink = sink.send_all(buffered);
 sink.await; // 异步调用 stream.await,每处理一个 item,就回调一下 sink trait 对应的函数

...

实现了一个异步任务框架,只需要实现 TaskState 和 Generate 这两个 trait 就可以异步执行并在错误的情况下可以重试。

trait TaskState

FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。

TaskResultCollector<Item>

collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:

代码块
languagerust
 // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
 fn collect(&mut self, item: Item) -> Result<CollectorState>;
 ​
 // 结束的时候调用
 fn finish(self) -> Result<Self::Output>;

实际上,整个异步框架了解以上两个类就足够,以下几个主要用于辅助以上两个类运作。

FutureTaskStream<S>

stream 类,我们在里面封装了 TaskState,stream.await 调用的时候,就会在 poll_next 里面调用 TaskState 的 new_sub_task 并返回 future 对象,最终异步执行 TaskState 的 future。每次执行完,都会调用 TaskState 的 next 函数,next 函数会根据当前 TaskState 的执行状态生成新的 TaskState 供 stream 执行。

trait TaskState

即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。

实际上,整个异步框架了解以上两个类就足够,以下两个主要用于辅助以上两个类运作。

trait Generator 和 struct TaskGenerator<S, C>

...

由 Generator 返回,对 future 进行了封装,可以获得 future 的 handle,与 TaskState 不同的是,TaskState 是用于批量执行子任务的,而 TaskFuture<Output> 更像是一个单独的 future 封装。

FutureTaskSink<Item, Output>

主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output>  创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)

UML

...

从精简的伪代码理解

代码块
languagerust
 // 任务,其中的 future 返回 item
impl TaskState for MyTaskState {
  //  FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll
  // 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述)
  fn new_sub_task(&self) -> future { 
     let fut = async {
        do_something_in_async(); // 业务代码,返回 item
    };
    fut
  }	
  //  前一个结束,则后一个开始,返回新的 TaskState对象
  fn next(self) -> Self	{
     match self.result {
       A_result => AnotherMyTaskStateA::new()
       B_result => AnotherMyTaskStateB::new()
    }
  }						
}

// 任务结果,处理每个 future 的结果 item
impl TaskResultCollector<Item> for MyTaskStateOutputCollect {
  // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
  fn collect(&mut self, item: Item) -> Result<CollectorState> {
    self.storage.save(item);  
  }

	// 结束的时候调用
	fn finish(self) -> Result<Self::Output> {
  	self.storage.flush();
  }
}

// 以下是使用方法
let state = MyTaskState::new();

// 交给 TaskGenerator 跑异步流程
let generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state);

// 1、初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState::new_sub_task 返回的 future
// 2、所有 future 结束后,调用 TaskResultCollector 的 collect 存储结果
// 以上两步都封装好在 TaskGenerator 中异步执行
generator.await;