...
其 impl 了官方的 Actor 和 Handle trait,也就是所有的异步消息都会走 ServiceActor<S>,其中 S 是我们应用层真正实现业务的类,例如,如果我们想新建一个 XXX service,那么只需要:
代码块 | ||
---|---|---|
| ||
// 定义service 和 创建工厂类 // impl ActorService trait,这样才可以给到 ServiceActor 中的 proxy 控制 // impl EventHandler<S, E> 和 ServiceHandler<S, R> 处理消息 struct XXXService; // 实现 ServiceFactory<S>::create 函数,生成 serivce struct XXXServiceFactory; // 使用,在 init_system() 函数中,通过 registry 对象进行注册启动 service registry.register_by_factory::<XXXService, XXXServiceFactory>().await?; // 此处会开启一个线程处理XXX Service 感兴趣的消息 |
...
stream 即流,类似比较熟悉的概念即 iterator,即 iterator 有的函数 stream 也有(比如,map,collect 等),也就是说 stream 一种集合,如果我们看 stream 的定义,会发现有一个和 next(iterator 的函数) 和 poll(future 的函数) 类似的函数:
代码块 | ||
---|---|---|
| ||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; |
如果我们调用了stream 的 await,那么它的 poll_next 就会被触发,其返回也是 Ready 或者 Penndig,关联集合中的某一项(Item)。当所有的集合元素都被 poll_next 一遍后,就返回 Ready(None)。相比之前 for 循环同步遍历 iterator,stream 这些动作都是异步执行,如下(伪代码):
代码块 | ||
---|---|---|
| ||
let s = stream::new(); while let some(item) = s.await { // await 触发 next_poll 调用,有 item 则返回,没 item 则挂起等待 println!(item); } |
...
简单说,就是相比单个 future, stream 实现了批量异步执行子任务。如下(伪代码):
代码块 | ||
---|---|---|
| ||
let s = stream::new(); while let some(future) = s.await { // await 触发 next_poll 调用,有 future 则返回,没 future 则挂起等待 let result = future.await; // 触发 poll,若返回 pending,则挂起 println!(result); } |
(待补充图)
iter 函数 和 buffered 函数
...
1、stream 的 item 不是 future 对象,使用 iter,iter 函数返回 Collect 对象;它帮我们实现了异步 await item 的流程,关键源代码如下:
代码块 | ||
---|---|---|
| ||
impl<St, C> Future for Collect<St, C> where St: Stream, C: Default + Extend<St::Item>, { type Output = C; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> { let mut this = self.as_mut().project(); loop { match ready!(this.stream.as_mut().poll_next(cx)) { // 调用了 stream 的 poll_next,异步拉取 item Some(e) => this.collection.extend(Some(e)), None => return Poll::Ready(self.finish()), } } } } |
2、stream 的 item 是future 对象,使用 buffered,buffered 函数返回 Buffered 对象;它帮我们实现了启动 future await 的流程:
代码块 | ||
---|---|---|
| ||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); // First up, try to spawn off as many futures as possible by filling up // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { // 已经 ready 有对象实体的,进入执行队列 Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } // 拉起执行队列中的 future // Attempt to pull the next value from the in_progress_queue let res = this.in_progress_queue.poll_next_unpin(cx); if let Some(val) = ready!(res) { return Poll::Ready(Some(val)); } // If more values are still coming from the stream, we're not done yet if this.stream.is_done() { Poll::Ready(None) } else { Poll::Pending } } |
...
stream 的 iter 和 buffer 函数返回 future 异步的去遍历 item,但其返回是所有的 item 集合,是一次性的,也就是 stream 没有给我们展现中间过程的异步处理。如果我们想每处理一个 item 就做一些事情,就需要 sink trait。sink 可以让我们监控整个 stream 内部遍历 item 的动作,翻看 sink 的 trait 声明会看到:
代码块 | ||
---|---|---|
| ||
pub trait Sink<Item> { /// The type of value produced by the sink when an error occurs. type Error; // start_send 之前调用 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; // 处理某个 item 之前调用 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; // 所有 item 都过一遍后调用,若还有 item 处于挂起,则返回 pending,否则若执行返回 ready fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; // 结束前调用 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; } |
有了 sink,就可以监控对 item 的处理,例如(伪代码):
代码块 | ||
---|---|---|
| ||
let s = stream::new(); // stream 转为 map<stream, future>,这里的 future 即在 sink 中异步去调用 stream await let buffered = s.buffered().map(ok).collect(); let sink = sink.send_all(buffered); sink.await; // 异步调用 stream.await,每处理一个 item,就回调一下 sink trait 对应的函数 |
...
实现了一个异步任务框架,只需要实现 TaskState 和 Generate 这两个 trait 就可以异步执行并在错误的情况下可以重试。
trait TaskState
即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。
TaskResultCollector<Item>
即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:
代码块 | ||
---|---|---|
| ||
// 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
fn collect(&mut self, item: Item) -> Result<CollectorState>;
// 结束的时候调用
fn finish(self) -> Result<Self::Output>; |
实际上,整个异步框架了解以上两个类就足够,以下几个主要用于辅助以上两个类运作。
FutureTaskStream<S>
即 stream 类,我们在里面封装了 TaskState,stream.await 调用的时候,就会在 poll_next 里面调用 TaskState 的 new_sub_task 并返回 future 对象,最终异步执行 TaskState 的 future。每次执行完,都会调用 TaskState 的 next 函数,next 函数会根据当前 TaskState 的执行状态生成新的 TaskState 供 stream 执行。
trait TaskState
即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。
实际上,整个异步框架了解以上两个类就足够,以下两个主要用于辅助以上两个类运作。
trait Generator 和 struct TaskGenerator<S, C>
...
由 Generator 返回,对 future 进行了封装,可以获得 future 的 handle,与 TaskState 不同的是,TaskState 是用于批量执行子任务的,而 TaskFuture<Output> 更像是一个单独的 future 封装。
FutureTaskSink<Item, Output>
主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output> 创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)
UML
...
从精简的伪代码理解
代码块 | ||
---|---|---|
| ||
// 任务,其中的 future 返回 item
impl TaskState for MyTaskState {
// FutureTaskStream<S> 的 poll_next 中会调用,并拉起 future 异步流程, 调用 future 的poll
// 检查 future 的返回值,最后调用 next 生成新的 TaskState(当然也可能重试,边角逻辑不再赘述)
fn new_sub_task(&self) -> future {
let fut = async {
do_something_in_async(); // 业务代码,返回 item
};
fut
}
// 前一个结束,则后一个开始,返回新的 TaskState对象
fn next(self) -> Self {
match self.result {
A_result => AnotherMyTaskStateA::new()
B_result => AnotherMyTaskStateB::new()
}
}
}
// 任务结果,处理每个 future 的结果 item
impl TaskResultCollector<Item> for MyTaskStateOutputCollect {
// 每获取一个结果就会调用一次,CollectorState 是 enough 则不再调用,是 need 继续处理下一个结果
fn collect(&mut self, item: Item) -> Result<CollectorState> {
self.storage.save(item);
}
// 结束的时候调用
fn finish(self) -> Result<Self::Output> {
self.storage.flush();
}
}
// 以下是使用方法
let state = MyTaskState::new();
// 交给 TaskGenerator 跑异步流程
let generator = TaskGenerator<MyTaskState, MyTaskStateOutputCollect>::generate(state);
// 1、初始化 stream,buffered,放入 sink 中异步执行 stream 的 TaskState::new_sub_task 返回的 future
// 2、所有 future 结束后,调用 TaskResultCollector 的 collect 存储结果
// 以上两步都封装好在 TaskGenerator 中异步执行
generator.await; |