...
其 impl 了官方的 Actor 和 Handle trait,也就是所有的异步消息都会走 ServiceActor<S>,其中 S 是我们应用层真正实现业务的类,例如,如果我们想新建一个 XXX service,那么只需要:
代码块 |
---|
|
// 定义service 和 创建工厂类
// impl ActorService trait,这样才可以给到 ServiceActor 中的 proxy 控制
// impl EventHandler<S, E> 和 ServiceHandler<S, R> 处理消息
struct XXXService;
// 实现 ServiceFactory<S>::create 函数,生成 serivce
struct XXXServiceFactory;
// 使用,在 init_system() 函数中,通过 registry 对象进行注册启动 service
registry.register_by_factory::<XXXService, XXXServiceFactory>().await?; // 此处会开启一个线程处理XXX Service 感兴趣的消息 |
...
stream 即流,类似比较熟悉的概念即 iterator,即 iterator 有的函数 stream 也有(比如,map,collect 等),也就是说 stream 一种集合,如果我们看 stream 的定义,会发现有一个和 next(iterator 的函数) 和 poll(future 的函数) 类似的函数:
代码块 |
---|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; |
如果我们调用了stream 的 await,那么它的 poll_next 就会被触发,其返回也是 Ready 或者 Penndig,关联集合中的某一项(Item)。当所有的集合元素都被 poll_next 一遍后,就返回 Ready(None)。相比之前 for 循环同步遍历 iterator,stream 这些动作都是异步执行,如下(伪代码):
代码块 |
---|
|
let s = stream::new();
while let some(item) = s.await { // await 触发 next_poll 调用,有 item 则返回,没 item 则挂起等待
println!(item);
} |
...
简单说,就是相比单个 future, stream 实现了批量异步执行子任务。如下(伪代码):
代码块 |
---|
|
let s = stream::new();
while let some(future) = s.await { // await 触发 next_poll 调用,有 future 则返回,没 future 则挂起等待
let result = future.await; // 触发 poll,若返回 pending,则挂起
println!(result);
} |
...
1、stream 的 item 不是 future 对象,使用 iter,iter 函数返回 Collect 对象;它帮我们实现了异步 await item 的流程,关键源代码如下:
代码块 |
---|
|
impl<St, C> Future for Collect<St, C>
where
St: Stream,
C: Default + Extend<St::Item>,
{
type Output = C;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) { // 调用了 stream 的 poll_next,异步拉取 item
Some(e) => this.collection.extend(Some(e)),
None => return Poll::Ready(self.finish()),
}
}
}
} |
2、stream 的 item 是future 对象,使用 buffered,buffered 函数返回 Buffered 对象;它帮我们实现了启动 future await 的流程:
代码块 |
---|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx) {
// 已经 ready 有对象实体的,进入执行队列
Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
// 拉起执行队列中的 future
// Attempt to pull the next value from the in_progress_queue
let res = this.in_progress_queue.poll_next_unpin(cx);
if let Some(val) = ready!(res) {
return Poll::Ready(Some(val));
}
// If more values are still coming from the stream, we're not done yet
if this.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
} |
...
stream 的 iter 和 buffer 函数返回 future 异步的去遍历 item,但其返回是所有的 item 集合,是一次性的,也就是 stream 没有给我们展现中间过程的异步处理。如果我们想每处理一个 item 就做一些事情,就需要 sink trait。sink 可以让我们监控整个 stream 内部遍历 item 的动作,翻看 sink 的 trait 声明会看到:
代码块 |
---|
|
pub trait Sink<Item> {
/// The type of value produced by the sink when an error occurs.
type Error;
// start_send 之前调用
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
// 处理某个 item 之前调用
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
// 所有 item 都过一遍后调用,若还有 item 处于挂起,则返回 pending,否则若执行返回 ready
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
// 结束前调用
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
} |
有了 sink,就可以监控对 item 的处理,例如(伪代码):
代码块 |
---|
|
let s = stream::new();
// stream 转为 map<stream, future>,这里的 future 即在 sink 中异步去调用 stream await
let buffered = s.buffered().map(ok).collect();
let sink = sink.send_all(buffered);
sink.await; // 异步调用 stream.await,每处理一个 item,就回调一下 sink trait 对应的函数 |
...
即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:
代码块 |
---|
|
// 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
fn collect(&mut self, item: Item) -> Result<CollectorState>;
// 结束的时候调用
fn finish(self) -> Result<Self::Output>; |
...