/
starcoin 异步模型分析

starcoin 异步模型分析

基于 actix

官方的 actix

首先应该先根据官方的文档,熟悉基本的 actix 框架,包括:actor,address,contex,arbiter 和 sync arbiter 这些基本概念。

官方文档和示例代码见:Quick start | Actix Web

starcoin 中基于 actix 的异步消息处理框架

综述

starcoin 的异步消息框架是对 actix 进行了一层封装,主要思想是:一个服务一个线程。

ServiceActor<S>

其 impl 了官方的 Actor 和 Handle trait,也就是所有的异步消息都会走 ServiceActor<S>,其中 S 是我们应用层真正实现业务的类,例如,如果我们想新建一个 XXX service,那么只需要:

 // 定义service 和 创建工厂类  // impl ActorService trait,这样才可以给到 ServiceActor 中的 proxy 控制  // impl EventHandler<S, E> 和 ServiceHandler<S, R> 处理消息  struct XXXService;  ​  // 实现 ServiceFactory<S>::create 函数,生成 serivce  struct XXXServiceFactory;  ​  // 使用,在 init_system() 函数中,通过 registry 对象进行注册启动 service  registry.register_by_factory::<XXXService, XXXServiceFactory>().await?; // 此处会开启一个线程处理XXX Service 感兴趣的消息

因此,ServiceActor<S> 主要还是用于完成对 Actor 的封装,黏合 Actor 和starocin service。

ServiceActor<S> 内部主要封装了一个 proxy (ServiceHandlerProxy<S>),它封就是我们的服务 proxy,任何 request 都是通过这个 proxy 类再分发到对应的 service 上去的。这个属于 starcoin 基于 actix 的异步消息的内部实现,一般是感知不到。

EventHandler<S, M> 和 ServiceHandler<S, R>

starcoin 的消息分为 event 和 service 两种,本质上没啥不同,都是 actix 消息,前者应该是用于事件监听(例如开始同步,结束同步),没有返回值,后者是用于消息发送,有返回值。我们定义 XXXService 后可以 impl 这两个 trait,其中 S 是我们的 service, M 和 R 是我们自定义的 message struct。实现 handle 函数处理发来的消息。

ServiceContext<'a, S>

我们还会遇到 service conntext 类,这个类其实就是封装了 actix 的context,即可以通过这个 context 获得 service 的地址,启动,重启 service 本身。任何 service 都可以在 handle 消息的时候拿到这个 context。

比 actix 更多的是,它还封装了一个 service cache,里面放了两样东西:

1、一个特殊的 service: register serice;

2、其它对象的引用(放在 box 中),包括:配置,其它 service的等,用 any 这个类型放的;

ServiceRef<S>

其实就是封装了 actix 的 address,用于给服务发消息,还可以启动停止通知等操作,因为 service 之间经常交互发消息,因此会比较常用。

一个特殊的service:RegistryService

RegistryService 比较特殊,它是第一个创建的 service,它的任务就是创建一个线程(用 actix 的 arbiter)其它 service,并把其它 service 放到它的 context 中缓存起来,而且它也存放了一些全局属性,比如 node config,vm config,甚至 genesis 等对象。

主要有几个关键的功能:

函数名(同步)/消息(异步)

作用与实现

函数名(同步)/消息(异步)

作用与实现

register/RegisterRequest

创建一个线程,拉起一个 service

register_by_factory(这是一个asyn 函数仅支持异步)

同 register,但需要设计 service 方提供一个 factory类(实现ServiceFactory<S> 这个 trait,S 是 service 类)

get_shared/GetShardRequest

获取任何对象,一般是全局唯一的对象,比如配置,genesis,serivice 对象。里面用 any 存放

put_shared/PutShardRequest

存放任何对象,一般是全局唯一的对象,比如配置,genesis,serivice 对象。里面用 any 存放

service_ref/ServiceRefRequest

获得某个 service 的 ServiceRef,即 address

这个 register serivice 的 address 会被放在 context 中,即每个 service 都可以引用它,给它发消息进行交互(异步使用),也可以同步调用它直接的实现函数(同步使用)。因此,RegistryService 相当于一个全局 service 容器,通过它(异步给它发消息 ServiceRefRequest<S>,同步直接调用 get_shared),可以获得其它 service 的 ServiceRef<S> 这样就可以和其它服务交互了。其关键代码见:commons/service-register/src/service_registry.rs。

对 RegistryService 对象的初始化构建流程目前都集中在启动过节点时的 init_system 函数 (node/src/node.rs)中。

调试方法与经验

如果是同步调用,比较简单,如平常一样流水线般跟踪即可。

如果是异步调用:可以跟踪 request 的处理,看哪里处理了 request(即看 impl 了event handle或者 service handle 的地方)。

 

基于 futures

futures 的官方文档

基于 futures 的比较复杂,官方文档和例子比较少,也难懂,所以建议入门可以参考 tokio 的官方文档熟悉一下(相对 futures 简单):

Tutorial | Tokio - An asynchronous Rust runtime

入门后再看 futures 的官方文档:

futures - Rust

可以仔细阅读注释(文档)来学习 futures。

理解 async 和 await

tokio 的官方文档有很详细很生动的解释 async 和 await,建议跟着敲一遍里面的代码体会一下 async 和 await,这里简单总结一下:

async:即生成一个 future,如果函数被定义为 async 函数,则说明函数返回值是一个 future;一个 lambda 被定义为 async,也变成 future。

await:一个 future 对象的特殊调用,即尝试执行一个 future,若 future 因为被挂起,则等待唤醒,唤醒后,继续往下执行。重点理解这篇文章:Async in depth | Tokio - An asynchronous Rust runtime 。简答的说,即:调用 await 会触发 future 类的 poll 调用,若 poll 函数返回 Ready 状态,那么 await 就返回,且带上 Ready 关联的返回值。若 poll 函数返回 Pending 状态,那么 await 继续挂起,即阻塞在 await 中,直到 有人调用 poll 中的 conntext waker 的 wake 函数,此时 poll 函数会重新被拉起,看 await 是否又可以继续执行:

了解了基本的 future await 概念后就可以去看 futures的 stream 了。

 

stream 异步编程

Stream

futures 提供了一种特殊的异步模型,即 stream,它是基于 future 异步(注意 futures 和 future,前者是一个crate,后者是一个类)类实现的。

stream 即流,类似比较熟悉的概念即 iterator,即 iterator 有的函数 stream 也有(比如,map,collect 等),也就是说 stream 一种集合,如果我们看 stream 的定义,会发现有一个和 next(iterator 的函数) 和 poll(future 的函数) 类似的函数:

  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

如果我们调用了stream 的 await,那么它的 poll_next 就会被触发,其返回也是 Ready 或者 Penndig,关联集合中的某一项(Item)。当所有的集合元素都被 poll_next 一遍后,就返回 Ready(None)。相比之前 for 循环同步遍历 iterator,stream 这些动作都是异步执行,如下(伪代码):

 let s = stream::new();  while let some(item) = s.await { // await 触发 next_poll 调用,有 item 则返回,没 item 则挂起等待    println!(item);  }

特别的,若 stream 中的 item 都是 future 的话,那么我们就可以实现一种异步批量执行 future 的异步模型:

如果有一个任务 task,它包含了若干个子任务 sub tabsk,且需要异步执行,那么,此时可以用 stream 来控制,例如第一次调用 poll_next 生成第一个子任务,子任务结束的时候 wake 起 stream 继续 poll next,直到执行结束。这里的子任务即 future。

简单说,就是相比单个 future, stream 实现了批量异步执行子任务。如下(伪代码):

 

 

iter 函数 和 buffered 函数

实际上,stream 并不是异步类,即没有 stream.await 这样的调用。想获得异步对象,需要调用 stream 的 iter 或者 buffered 函数。

1、stream 的 item 不是 future 对象,使用 iter,iter 函数返回 Collect 对象;它帮我们实现了异步 await item 的流程,关键源代码如下:

2、stream 的 item 是future 对象,使用 buffered,buffered 函数返回 Buffered 对象;它帮我们实现了启动 future await 的流程:

可以看到,poll_next 中会 while 遍历 future,然后若这些 future 上一次都是返回 ready,则放入队列中,然后依次触发队列中的 future(poll_next_unpin会调用到 future 的 poll)。

以上可以参看我的例子中的 test_stream_vec 和 test_stream 两个测试函数:

myactix/src/main.rs at master · jackzhhuang/myactix

当然,后面方便讨论还是会说 stream.await,知道需要使用 iter 或者 buffered 来启动异步遍历流程就好。

Sink

stream 的 iter 和 buffer 函数返回 future 异步的去遍历 item,但其返回是所有的 item 集合,是一次性的,也就是 stream 没有给我们展现中间过程的异步处理。如果我们想每处理一个 item 就做一些事情,就需要 sink trait。sink 可以让我们监控整个 stream 内部遍历 item 的动作,翻看 sink 的 trait 声明会看到:

有了 sink,就可以监控对 item 的处理,例如(伪代码):

 

starcoin 的 stream-task 异步框架

了解完官方的 futures 后可以开始看 starcoin 封装的 stream task 框架了。

综述

实现了一个异步任务框架,只需要实现 TaskState 和 Generate 这两个 trait 就可以异步执行并在错误的情况下可以重试。

trait TaskState

即 FutureTaskStream<S> 里面的 S。封装 future,在 new_sub_task 中返回当前的 future,next 则负责返回下一个 future,在 stream 中依次执行。

TaskResultCollector<Item>

即 collector,在 FutureTaskSink<Item, Output> 中收集 future 返回的结果。主要有两个函数需要实现:

实际上,整个异步框架了解以上两个类就足够,以下几个主要用于辅助以上两个类运作。

FutureTaskStream<S>

即 stream 类,我们在里面封装了 TaskState,stream.await 调用的时候,就会在 poll_next 里面调用 TaskState 的 new_sub_task 并返回 future 对象,最终异步执行 TaskState 的 future。每次执行完,都会调用 TaskState 的 next 函数,next 函数会根据当前 TaskState 的执行状态生成新的 TaskState 供 stream 执行。

trait Generator 和 struct TaskGenerator<S, C>

generate 函数主要拉起了整个异步流程,即初始化 TaskState,stream 和 sink。其把这些流程也打包到一个 future 中,然后调用放会 await 拉起异步 stream 流程。

TaskFuture<Output>

由 Generator 返回,对 future 进行了封装,可以获得 future 的 handle,与 TaskState 不同的是,TaskState 是用于批量执行子任务的,而 TaskFuture<Output> 更像是一个单独的 future 封装。

FutureTaskSink<Item, Output>

主要是用于收集 Output,Output 是一个 TaskResultCollector<Item, Output = Output> trait bound 的类(即代码中的 collector),用于存储 TaskState 的结果。其做法是在 FutureTaskSink<Item, Output>  创建的时候(new 方法),创建一个 channel,获得 sender 和 receiver,sink 每处理一个 future 的结果(item),就在 start_send 中 send 给 receiver,receiver 就会调用 collector 存储结果。(有一个细节,sink 创建 collect 线程的时候会 let receiver = receiver.fuse(),其目的是所有 stream 的 future 都返回结果的时候才开始处理结果)

UML

从精简的伪代码理解

 

Related content

区块交易执行流程
区块交易执行流程
More like this
starcoin代码概要
starcoin代码概要
More like this
区块同步代码分析
区块同步代码分析
Read with this
NameService v0.1 接口文档
NameService v0.1 接口文档
More like this
Starcoin-Framework 升级
Starcoin-Framework 升级
Read with this
starcoin-framework v12讨论
starcoin-framework v12讨论
More like this