版本比较

密钥

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

其 impl 了官方的 Actor 和 Handle trait,也就是所有的异步消息都会走 ServiceActor<S>,其中 S 是我们应用层真正实现业务的类,例如,如果我们想新建一个 XXX service,那么只需要:

代码块
languagerust
 // 定义service 和 创建工厂类
 // impl ActorService trait,这样才可以给到 ServiceActor 中的 proxy 控制
 // impl EventHandler<S, E> 和 ServiceHandler<S, R> 处理消息
 struct XXXService; 
 ​
 // 实现 ServiceFactory<S>::create 函数,生成 serivce
 struct XXXServiceFactory; 
 ​
 // 使用,在 init_system() 函数中,通过 registry 对象进行注册启动 service
 registry.register_by_factory::<XXXService, XXXServiceFactory>().await?; // 此处会开启一个线程处理XXX Service 感兴趣的消息

...

stream 即流,类似比较熟悉的概念即 iterator,即 iterator 有的函数 stream 也有(比如,map,collect 等),也就是说 stream 一种集合,如果我们看 stream 的定义,会发现有一个和 next(iterator 的函数) 和 poll(future 的函数) 类似的函数:

代码块
languagerust
  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

如果我们调用了stream 的 await,那么它的 poll_next 就会被触发,其返回也是 Ready 或者 Penndig,关联集合中的某一项(Item)。当所有的集合元素都被 poll_next 一遍后,就返回 Ready(None)。相比之前 for 循环同步遍历 iterator,stream 这些动作都是异步执行,如下(伪代码):

代码块
languagerust
 let s = stream::new();
 while let some(item) = s.await { // await 触发 next_poll 调用,有 item 则返回,没 item 则挂起等待
   println!(item);
 }

...

简单说,就是相比单个 future, stream 实现了批量异步执行子任务。如下(伪代码):

代码块
languagerust
 let s = stream::new();
 while let some(future) = s.await { // await 触发 next_poll 调用,有 future 则返回,没 future 则挂起等待
   let result = future.await;  //  触发 poll,若返回 pending,则挂起
   println!(result);
 }

...

1、stream 的 item 不是 future 对象,使用 iter,iter 函数返回 Collect 对象;它帮我们实现了异步 await item 的流程,关键源代码如下:

代码块
languagerust
 impl<St, C> Future for Collect<St, C>
 where
     St: Stream,
     C: Default + Extend<St::Item>,
 {
     type Output = C;
 ​
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
         let mut this = self.as_mut().project();
         loop {
             match ready!(this.stream.as_mut().poll_next(cx)) {  // 调用了 stream 的 poll_next,异步拉取 item
                 Some(e) => this.collection.extend(Some(e)),
                 None => return Poll::Ready(self.finish()),
             }
         }
     }
 }

2、stream 的 item 是future 对象,使用 buffered,buffered 函数返回 Buffered 对象;它帮我们实现了启动 future await 的流程:

代码块
languagerust
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let mut this = self.project();
 ​
         // First up, try to spawn off as many futures as possible by filling up
         // our queue of futures.
         while this.in_progress_queue.len() < *this.max {
             match this.stream.as_mut().poll_next(cx) {
                 // 已经 ready 有对象实体的,进入执行队列
                 Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), 
                 Poll::Ready(None) | Poll::Pending => break,
             }
         }
 ​
         // 拉起执行队列中的 future 
         // Attempt to pull the next value from the in_progress_queue
         let res = this.in_progress_queue.poll_next_unpin(cx);
         if let Some(val) = ready!(res) {
             return Poll::Ready(Some(val));
         }
 ​
         // If more values are still coming from the stream, we're not done yet
         if this.stream.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
         }
     }

...

stream 的 iter 和 buffer 函数返回 future 异步的去遍历 item,但其返回是所有的 item 集合,是一次性的,也就是 stream 没有给我们展现中间过程的异步处理。如果我们想每处理一个 item 就做一些事情,就需要 sink trait。sink 可以让我们监控整个 stream 内部遍历 item 的动作,翻看 sink 的 trait 声明会看到:

代码块
languagerust
 pub trait Sink<Item> {
     /// The type of value produced by the sink when an error occurs.
     type Error;
 ​
     // start_send 之前调用
     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
     
     // 处理某个 item 之前调用
     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
 ​
     // 所有 item 都过一遍后调用,若还有 item 处于挂起,则返回 pending,否则若执行返回 ready
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
 ​
     // 结束前调用
     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
 }

有了 sink,就可以监控对 item 的处理,例如(伪代码):

代码块
languagerust
 let s = stream::new();
 ​
 // stream 转为 map<stream, future>,这里的 future 即在 sink 中异步去调用 stream await
 let buffered = s.buffered().map(ok).collect(); 
 let sink = sink.send_all(buffered);
 sink.await; // 异步调用 stream.await,每处理一个 item,就回调一下 sink trait 对应的函数

...

即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:

代码块
languagerust
 // 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
 fn collect(&mut self, item: Item) -> Result<CollectorState>;
 ​
 // 结束的时候调用
 fn finish(self) -> Result<Self::Output>;

...