Openstack 实现技术分解 (4) 通用技术 — TaskFlow

目录

  • 目录
  • 前文列表
  • 扩展阅读
  • 简介
  • 基本概念
  • 实现样例
  • 最后

前文列表

Openstack 实现技术分解 (1) 开发环境 — Devstack 部署案例详解
Openstack 实现技术分解 (2) 虚拟机初始化工具 — Cloud-Init & metadata & userdata
Openstack 实现技术分解 (3) 开发工具 — VIM & dotfiles

扩展阅读

TaskFlow 代码库
TaskFlow 文档

简介

TaskFlow is a Python library that helps to make task execution easy, consistent and reliable.
A library to do [jobs, tasks, flows] in a highly available, easy to understand and declarative manner (and more!) to be used with OpenStack and other projects.

简而言之, TaskFlow 能够控制应用程序中的长流程业务逻辑任务的暂停、重启、恢复以及回滚, 主要用于保证长流程任务执行的可靠性和一致性。

主要应用场景有如 Cinder 的 create volume 这般复杂、冗长、容易失败, 却又要求保持数据与环境一致的业务逻辑.

从 create volume 流程图看, Cinder 在 create_volume.py(cinder/volume/flows/manager/create_volume.py) 模块中定义了大量的 Tasks class 来组成 TaskFlow:

  • OnFailureRescheduleTask
  • ExtractVolumeRefTask
  • ExtractVolumeSpecTask
  • NotifyVolumeActionTask
  • CreateVolumeFromSpecTask
  • CreateVolumeOnFinishTask

如果在执行任务流的过程中失败了, TaskFlow 的回滚机制能够让程序流和执行环境回滚到初始状态, 并且可以重新开始执行.

总的来说, TaskFlow 非常适合于 面向任务 的应用场景.

基本概念

Atom: An atom is the smallest unit in TaskFlow which acts as the base for other classes
Atom: Atom 是 TaskFlow 的最小单位, 其他的所有类, 包括 Task 类都是 Atom 类的子类.

Task: A task (derived from an atom) is a unit of work that can have an execute & rollback sequence associated with it (they are nearly analogous to functions).
Task: task 是拥有执行和回滚功能额最小工作单元. 在 Task 类中开发者能够自定义 execute(执行) 和 revert(回滚) method.

Flow: Linear/Unordered/Graph
Flow: 在 TaskFlow 中使用 flow(流) 来关联各个 Task, 并且规定这些 Task 之间的执行和回滚顺序. flow 中不仅能够包含 task 还能够嵌套 flow. 常见类型有以下几种:

  • Linear(linear_flow.Flow): 线性流, 该类型 flow 中的 task/flow 按照加入的顺序来依次执行, 按照加入的倒序依次回滚.
  • Unordered(unordered_flow.Flow): 无序流, 该类型 flow 中的 task/flow 可能按照任意的顺序来执行和回滚.
  • Graph(graph_flow.Flow): 图流, 该类型 flow 中的 task/flow 按照显式指定的依赖关系或通过其间的 provides/requires 属性的隐含依赖关系来执行和回滚.

Retry: A retry (derived from an atom) is a special unit of work that handles errors, controls flow execution and can (for example) retry other atoms with other parameters if needed.
Retry: Retry 是一个控制当错误发生时, 如何进行重试的特殊工作单元, 而且当你需要的时候还能够以其他参数来重试执行别的 Atom 子类. 常见类型:

  • AlwaysRevert: 错误发生时, 回滚子流
  • AlwaysRevertAll: 错误发生时, 回滚所有流
  • Times: 错误发生时, 重试子流
  • ForEach: 错误发生时, 为子流中的 Atom 提供一个新的值, 然后重试, 直到成功或 retry 中定义的值用完为止.
  • ParameterizedForEach: 错误发生时, 从后台存储(由 store 参数提供)中获取重试的值, 然后重试, 直到成功或后台存储中的值用完为止.

Engine: Engines are what really runs your atoms.
Engine: Engines 才是真正运行 Atoms 的对象, 用于 load(载入) 一个 flow, 然后驱动这个 flow 中的 task/flow 开始运行. 我们可以通过 engine_conf 参数来指定不同的 engine 类型. 常见的 engine 类型如下:

  • serial: 所有的 task 都在调用了 engine.run 的线程中运行.
  • parallel: task 可以被调度到不同的线程中运行.
  • worker-based: task 可以被调度到不同的 woker 中运行.

实现样例

源码请浏览 Github

#!/usr/bin/env python
#filename: tasks.py

import taskflow.engines
from taskflow.patterns import linear_flow as lt
from taskflow import task
from taskflow.types import failure as task_failed

class CallJim(task.Task):

    default_provides = set(['jim_new_number'])

    def execute(self, jim_number, *args, **kwargs):
        print "Calling Jim %s." % jim_number
        print '=' * 10
        jim_new_number = jim_number + 'new'

        return {'jim_new_number': jim_new_number}

    def revert(self, result, *args, **kwargs):
        if isinstance(result, task_failed.Failure):
            print "jim result"
            return None

        jim_new_number = result['jim_new_number']
        print "Calling jim %s and apologizing." % jim_new_number

class CallJoe(task.Task):

    default_provides = set(['joe_new_number', 'jim_new_number'])

    def execute(self, joe_number, jim_new_number, *args, **kwargs):
        print "Calling jim %s." % jim_new_number
        print "Calling Joe %s." % joe_number
        print '=' * 10
        joe_new_number = joe_number + 'new'

        return {'jim_new_number': jim_new_number,
                'joe_new_number': joe_new_number}

    def revert(self, result, *args, **kwargs):
        if isinstance(result, task_failed.Failure):
            print "joe result"
            return None

        jim_new_number = result['jim_new_number']
        joe_new_number = result['joe_new_number']

        print "Calling joe %s and apologizing." % joe_new_number

class CallJmilkFan(task.Task):

    default_provides = set(['new_numbers'])

    def execute(self, jim_new_number, joe_new_number, jmilkfan_number,
                *args, **kwargs):
        print "Calling jim %s" % jim_new_number
        print "Calling joe %s" % joe_new_number
        print "Calling jmilkfan %s" % jmilkfan_number
        print '=' * 10
        jmilkfan_new_number = jmilkfan_number + 'new'

        raise ValueError('Error')
        new_numbers =  {'jim_new_number': jim_new_number,
                        'joe_new_number': joe_new_number,
                        'jmilkfan_new_number': jmilkfan_new_number}

        return {'new_numbers': new_numbers}

    def revert(self, result, *args, **kwargs):
        if isinstance(result, task_failed.Failure):
            print "jmilkfan result"
            return None

        jim_new_number = result['jim_new_number']
        joe_new_number = result['joe_new_number']
        jmilkfan_new_number = result['jmilkfan_new_number']

        print "Calling jmilkfan %s and apologizing." % jmilkfan_new_number

def get_flow(flow, numbers):
    flow_name = flow
    flow_api = lt.Flow(flow_name)

    flow_api.add(CallJim(),
                 CallJoe(),
                 CallJmilkFan())

    return taskflow.engines.load(flow_api,
                                 engine_conf={'engine': 'serial'},
                                 store=numbers)

def main():
    numbers = {'jim_number': '1'*6,
               'joe_number': '2'*6,
               'jmilkfan_number': '3'*6}
    try:
        flow_engine = get_flow(flow='taskflow-demo',
                               numbers=numbers)

        flow_engine.run()
    except Exception:
        print "TaskFlow Failed!"
        raise

    new_numbers = flow_engine.storage.fetch('new_numbers')

if __name__ == '__main__':
    main()

Output:

fanguiju@fanguiju:~/project/my-code-repertory/TaskFlow-demo$ python tasks.py
Calling Jim 111111.
==========
Calling jim 111111new.
Calling Joe 222222.
==========
Calling jim 111111new
Calling joe 222222new
Calling jmilkfan 333333
==========
jmilkfan result
Calling joe 222222new and apologizing.
Calling jim 111111new and apologizing.
TaskFlow Failed!
Traceback (most recent call last):
  File "tasks.py", line 114, in <module>
    main()
  File "tasks.py", line 105, in main
    flow_engine.run()
  File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/engine.py", line 159, in run
    for _state in self.run_iter():
  File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/engine.py", line 223, in run_iter
    failure.Failure.reraise_if_any(it)
  File "/usr/local/lib/python2.7/dist-packages/taskflow/types/failure.py", line 292, in reraise_if_any
    failures[0].reraise()
  File "/usr/local/lib/python2.7/dist-packages/taskflow/types/failure.py", line 299, in reraise
    six.reraise(*self._exc_info)
  File "/usr/local/lib/python2.7/dist-packages/taskflow/engines/action_engine/executor.py", line 82, in _execute_task
    result = task.execute(**arguments)
  File "tasks.py", line 66, in execute
    raise ValueError('Error')
ValueError: Error

NOTE 1: 在 function get_flow 中使用 linear_flow.Flow 生成一个 TaskFlow(线性任务流) 对象 flow_api , 再通过flow_api.add method 添加要 顺序执行且倒序回滚 的 Task class(CallJim/CallJom/CallJmilkFan).

NOTE 2: 使用 taskflow.engines.load method 来加载 TaskFlow(flow_api)对象/后台存储数据(store)/ engine配置 等信息并生成 Task Engine 对象.

NOTE 3: 最后调用 Task Engine 对象的 flow_engine.run method 来开始执行该任务流.

NOTE 4: 后台存储 store 的数据在该任务流中被所有 Task class 共享, 并且以 Task class 中的 execute method 的形参作为对接入口. e.g. 上述实现的 store 后台存储中含有 {jim_number: '1'*6}, 那么 CallJim 的 execute method 就可以通过形参 jim_number 来获取 '1'*6 的值.

NOTE 5: Task class 的属性 default_provides 用于声明在执行过程中新添到后台存储的元素的名称, 其相应的值会自动的从 execute method 返回值中匹配获取, 最终存储后台存储. e.g. CallJim 的属性 default_provides = set(['jim_new_number']) 其中 jim_new_number 的值会从 execute method 的返回 return {'jim_new_number': jim_new_number} 中获取.

NOTE 6: provides 的实现能够有效的帮助传递 Task class 之间在执行时产生的新属性对象. 将上一个 Task 的结果传递给后一个 Task 使用.

最后

当实现的 TaskFlow 中包含了多个 Task(的确可能存在只有一个 Task 的 TaskFlow) 时, 有两点是需要注意的:

  • 在使用线性流类型的 TaskFlow 时, Task class revert method 回滚的应该是上一个 Task class execute method 的业务. e.g. BTask.revert 应该回滚 ATask.execute, 因为只有在 ATask.execute 成功执行的前提之下才有 revert 的价值. 所以在 revert method 的定义中需要实现语句 if isinstance(result, task_failed.Failure): return None. 当一个 Task 的 execute method 执行失败时, 那么 revert method 接收的 result 实参就是 taskflow.types.failure.Failure 的实例对象.
  • 尽量让每个 Task class 都仅处理一件事情, 这是为了让每一次回滚都足够精准. e.g. 尽管创建虚拟机和开启虚拟机都同属于对虚拟机的操作, 但是我们仍然应该将两者各自定义一个 Task class. 假如启动虚拟机失败时, 我们只需再次重试启动虚拟机, 而无须再次重复创建虚拟机.
时间: 2024-09-16 01:18:16

Openstack 实现技术分解 (4) 通用技术 — TaskFlow的相关文章

OpenStack 实现技术分解 (6) 通用库 — oslo_log

目录 目录 前文列表 扩展阅读 日志级别 oslolog 初始化设置 DEMO oslolog 的相关配置项 oslolog 的日志级别 oslolog 的使用技巧 推荐使用 LOGdebug 的地方 推荐使用 LOGinfo 的地方 推荐使用 LOGexception 的地方 推荐使用 LOGerror 的地方 推荐使用 LOGcretical 的地方 前文列表 OpenStack 实现技术分解 (1) 开发环境 - Devstack 部署案例详解 OpenStack 实现技术分解 (2) 虚

OpenStack 实现技术分解 (7) 通用库 — oslo_config

目录 目录 前文列表 扩展阅读 osloconfig argparse cfgpy class Opt class ConfigOpts CONF 对象的单例模式 前文列表 OpenStack 实现技术分解 (1) 开发环境 - Devstack 部署案例详解 OpenStack 实现技术分解 (2) 虚拟机初始化工具 - Cloud-Init & metadata & userdata OpenStack 实现技术分解 (3) 开发工具 - VIM & dotfiles Open

OpenStack 实现技术分解 (5) 应用开发 — 使用 OpenStackClients 进行二次开发

目录 目录 前文列表 参考阅读 前言 OpenStackClients 使用 OpenStackClients 获取 project_client object 的 demo 调用 project_client object 实例方法实现对 project 操作的 demo 最后 前文列表 OpenStack 实现技术分解 (1) 开发环境 - Devstack 部署案例详解 OpenStack 实现技术分解 (2) 虚拟机初始化工具 - Cloud-Init & metadata & u

Openstack 实现技术分解 (1) 开发环境 — Devstack 部署案例详解

目录 目录 前言 系统环境 Devstack 下载源码 配置文件 localconf localrc 简易的环境脚本 openrc 部署 Devstack 自动化部署流程 部署案例 单节点 Nova-Network 模式部署 多节点 Nova-Network 模式部署 多节点 Neutron 模式部署 单节点 Neutron 模式部署 部署样例 使用 不使用 FIXED-IP 让 Instances 直接使用连接外网的网段 最后 ERROR 前言 在继上一个系列博文 << 用 Flask 来写

Openstack 实现技术分解 (3) 开发工具 — VIM &amp;amp; dotfiles

目录 目录 前文列表 扩展阅读 前言 插件管理 Vundle 主题 Solarized 浏览项目目录结构 Nerdtree Symbol 窗口 Tagbar 文件模糊查询 CtrlP 代码补全 YouCompleteMe 语法检查 Syntastic 通用配置 dotfiles 前文列表 Openstack 实现技术分解 (1) 开发环境 - Devstack 部署案例详解 Openstack 实现技术分解 (2) 虚拟机初始化工具 - Cloud-Init & metadata & us

Openstack 实现技术分解 (2) 虚拟机初始化工具 — Cloud-Init &amp;amp; metadata &amp;amp; userdata

目录 目录 前文列表 扩展阅读 系统环境 前言 Cloud-init Cloud-init 的配置文件 metadata userdata metadata 和 userdata 的区别 metadata 的服务机制 ConfigDrive Metadata RESTful 前文列表 Openstack 实现技术分解 (1) 开发环境 - Devstack 部署案例详解 扩展阅读 Documentation - Cloud-Init 0.7.9 documentation 系统环境 Devsta

浏览器 控件-浏览器调用本地程序的通用技术

问题描述 浏览器调用本地程序的通用技术 firefox,chrome,ie调用本地程序都可以用哪种方法,哪种适合这三种浏览器呢. 原来的支付宝控件安装后三种浏览器都可以用,现在主流浏览器慢慢不支持npapi,那么有什么替代技术可以实现么? 现在项目用activex,不过感觉使用起来不方便. 解决方案 如何让浏览器调用你的程序来打开资源如何让浏览器调用你的程序来打开资源浏览器中调用外部程序并传递参数

IT行业未来在于资源共享 需建“通用技术平台”

中介交易 SEO诊断 淘宝客 云主机 技术大厅 1月1日消息,据国外媒体报道,根据亚历克斯·史蒂夫( Alex Steffen)在其书<改变世界>中的预测,世界计算机容量在某个给定时刻的闲置率达到了令人震惊的85%.而且很难想象计算机服务器的容量每年仍以大概28%的速度增长,其中中央处理器的增长率为17%,内存的增长率为45%.因此,现在是时候进行跨组织合作,清除闲置资源了.这一战略被称为"通用技术平台"(technology commons),将通过建立技术"通

通用技术咨询公司牵手中投证券共创环境产业基金

中国通用技术集团旗下专业从事市政.环境咨询业务的全资子公司通用技术咨询公司与中国建银投资证券有限责任公司16日共同签署了合作协议.中投证券将协助通用技术发起设立环境产业基金,建立全面的战略合作伙伴关系. 据了解,双方将在环境产业基金.城市市政基础设施等多个领域发挥各自优势,加强双方的合作. 通用技术咨询公司董事长刘德冰表示,作为致力于环境产业及市政咨询的中央直属企业,通用技术与中投证券的合作将为环境产业基金的诞生和健康运作提供有力的支持,为环境保护及市政基础设施领域的投融资模式创新开拓出新的思路