taskflow源码学习及使用

简介

taskflow是一个轻量级的图引擎,或者说是轻量级的tensorflow。
taskflow允许用户构建一个有向图,然后并行执行这个图,让图中相互依赖的节点串行执行,不依赖的节点并行执行。
这种方式既可视化了业务逻辑的依赖关系,又提升了整体执行的效率。
本文将先从源码的角度介绍taskflow库,然后基于taskflow开发一套企业级的策略引擎。

源码解读

taskflow库包括两个部分,图引擎和抢占式线程池。图引擎是taskflow的调度核心;抢占式线程池提升调度的性能。

抢占式线程池

抢占式线程池(work stealing thread pool)是一种多队列线程池,有较好的性能。
抢占式线程池包含一个全局队列,线程池中每个线程拥有一个自己的队列。当某个线程执行完自己队列中的任务后,就会从其他队列或者全局队列获取任务进行执行。

类似线程池还有两种,分别是单队列线程池和多队列线程池。单队列线程池中只有一个全局队列,所有的线程从这个队列中获取任务并进行执行。这种线程池的线程不会出现忙碌不均的情况,但是由于任务来自全局队列,增加的任务竞争,一定程度上影响了性能。多队列线程池中包含多个队列,每个线程拥有一个自己的任务队列。线程池分配接口将任务随机分配给本地队列,然后对应的线程去处理对应任务。这种线程池由于使用本地队列,减少了任务竞争,一定程度上提升了性能,但是本地队列长度不均使得部分线程极其忙碌,部分线程一直空闲,整体资源利用率不高。抢占式线程池的出现,结合了两者有优势,弥补了两者的不足,既提升了性能,又提升了资源利用率。

接着来讲一下taskflow库中抢占式线程池的实现。
它的实现在Executor,其成员变量如下:

  • _threads 线程组
  • _wsq 全局任务队列
  • _workers 线程上下文列表,类型是vector
  • _wids 线程号与上下文编号对应关系
  • _wsq 全局任务队列

Executor提供了一套run的接口,但是这些接口实际是将任务添加到队列中而已,线程组的线程会从全局队列中获取任务。

Worker是线程的上下文,成员变量包括:

  • _id 在_workers中的编号
  • _vtm 正在偷取的worker对应的在_workers中的编号
  • _rdgen 随机引擎,产生随机数
  • _wsq 本地任务队列

Executor中的线程又是如何配合的呢?? 当Executor执行graph时,会创建一个Topology对象(可以理解为graph的Session),并添加到graph的Topology队列中。如果这个队列只包含一个元素,则将这个Topology中的root节点添加到全局任务队列中(_wsq)。root节点的定义是,这些节点没有依赖节点。添加_wsq之后,唤醒相应数量的线程,这些线程从_wsq取任务。每个节点执行完毕后,会更新该节点的下游节点的状态,如果下游节点没有其他依赖节点,则将该节点添加到本地任务队列中。接着该线程继续执行本地任务队列,直到本地任务队列为空。

这时,开始偷窃逻辑了。_vtm初始化为_id值,当_vtm等于_id时,从全局队列中获取任务;如果没获得任务,则使用_rdgen随机一个值赋值给_vtm,_vtm对应的Worker对象的本地队列中获取任务;如果依旧没有获取任务,则再次进行随机;重试多次后没有获取任务,并且全局任务队列中有任务,则从全局任务队列中获取;如果还没有任务,先将_vtm还原为_id,然后进行等待。

本地队列的任务只会由当前线程添加,所以,当本地队列没任务之后,只能从其他队列或者全局队列获取,执行完任务才有可能有新就绪的任务。

Executor涉及两个组件,NotifierTaskQueue
TaskQueue是基于原子指针实现的一写多读循环队列,它使用TaskQueue::Array实现了一个环,然后使用一个指向头的原子变量和一个指向尾的原子变量,控制队列的出队和入队。
Notifier是从Eigen拷贝过来的,这里就不讲了。

图引擎

图引擎是taskflow库的核心,用户可以使用Taskflow对象创建一个有向图,然后使用Executor执行。
这个有向图执行时,相互依赖的节点串行执行,不相互依赖的节点并行执行。

图引擎主要涉及Taskflow、FlowBuilder、Topology、Graph、Node、Task类。
Node为图中的节点,记录与其他节点的依赖关系;Graph存储Node对象;FlowBuilder负责图构建;Taskflow为顶层图封装;
Topology用于基于图的一次执行。

使用Taskflow创建图时,Taskflow调用父类FlowBuilder的接口,将仿函数装入Node中,并将Node添加到Graph对象里,同时使用Task对象包裹Node类对象的指针。
用户使用Task的接口设置Node之间的依赖关系。

与tensorflow不同,Taskflow的Topology不记录每个节点的执行情况,依赖情况,仅仅记录本次执行的promise和需要等待多少个节点执行,而tensorflow的ExecutorState里记录了每个节点的依赖数量,并实时更新。

同一个graph多次执行时,每次执行生成一个Topology对象,添加到Taskflow对象的队列中。
graph被执行时会从这个队列里取第一个Topology,并初始化所有Node的依赖计数器。每个节点执行完毕,就会更新其下游节点的计数器。如果下游节点已经就绪,则添加任务队列中。

Node节点包含了大量的结构:

  • _name 节点名
  • _data 数据指针
  • _handle 节点类型句柄,包括静态任务、动态任务等等
  • _successors 下游节点列表
  • _dependents 上有节点列表
  • _topology 当前执行的session
  • _parent 父图节点
  • _state 执行状态
  • _join_counter 依赖节点数量
  • _semaphores 信号量

图执行时,会将依赖(_dependents)的节点添加就绪队列中;
当一个节点执行完毕,则会更新下游节点(_dependents)的依赖节点数量(_join_counter)。
如果某个节点的_join_counter为0,表示它依赖的节点都已经执行完毕,则达到就绪状态,可以开始执行了。

每次取出Topology对象时,都会对每个节点的_join_counter进行初始化,方便后续执行。
也由于这个设计,一个图(Taskflow对象)可以并行被调用多次,但是图的每次执行都是串行的,每次被调用就产生一个Topology,只有前一个Topology执行完毕,才会执行后一个Topology。

为了保障一个图能被并行执行,需要根据这个图创建多个Taskflow对象,每次执行使用不同的对象。
这种方式带来的问题,要么每次执行前构建一个图,然后执行;要么创建缓存队列,执行时从队列里取,执行完毕后再放回队列中,如果不够用则动态创建。
相对来说,还是后者性能更高一些。当然tensorflow就没有这个问题了,每次执行时创建一个executionStatus就行了。

Node的仿函数存储在handle_对象中,通过仿函数的参数提取机制,将仿函数装箱到对应的结构中,然后将这个对象塞到variant。调用时判断其类型,设置入参。

策略引擎

参考