本文发布于Cylon的收藏册,转载请著名原文链接~
工作流模型是指 Stackstorm 中 Orquesta 工作流的定义,包含工作流的执行方式,也可以理解为工作流模型就是 Workflow DSL 定义任务执行的有向图
工作流模型
下表是工作流模型 (Workflow Model) 的属性。工作流接受 Input,按预定义顺序执行一组任务 (Task),并返回输出 (Output)。此处的工作流模型是一个有向图 (directed graph),其中 Task 是节点,Taskt 之间的转换及其条件形成边。组成工作流的任务将在 DSL 中定义为名为 Task 的字典, 其中 Key 和 Value 分别是任务名称和任务模型。
Attribute | Required | Description |
---|---|---|
version | Yes | The version of the spec being used in this workflow DSL. |
description | No | The description of the workflow. |
input | No | A list of input arguments for this workflow. |
vars | No | A list of variables defined for the scope of this workflow. |
tasks | Yes | A dictionary of tasks that defines the intent of this workflow. |
output | No | A list of variables defined as output for the workflow. |
下面示例是一个 Workflow 的 DSL 定义,说明基础的工作流模型中各个部分的组成
version: 1.0
description: A simple workflow.
# 字符串列表,假设将在运行时提供值或键值对,其中当运行时未提供值时,值使用默认值。
# 这些属性的定义在 Action 的元数据文件中定义,包含默认值
input:
- arg1
- arg2: abc
# 一组键值对的变量,可以用于存储数据
vars:
- var1: 123
- var2: True
- var3: null
# 由字典组成任务定义,执行任务顺序由入栈任务转换和出栈条件组成
tasks:
# 定义两个任务,next为下一个执行的任务
task1:
action: core.noop
next:
- do: task2
task2:
action: core.noop
# 要输出的键值对列表
output:
- var3: <% ctx().arg1 %>
- var4:
var41: 456
var42: def
- var5:
- 1.0
- 2.0
- 3.0
with-item 模型
with-item 模型 是workflow 批量处理任务的一种方式,with-item 将遍历每个 item,然后作为参数传给 Action,默认情况下,所有 Item 将同时处理。当配置 concurrency 指定时,将处理最多 concurrency 值的 item 数,其余项目将排队等待执行,当 Item 的 Action 执行完成后,将处理列表中的下一个item。
任务结果是按照与item相同的顺序排列的Action执行结果的列表。所有Action执行必须成功完成,任务才能达到成功状态。如果一个或多个Action执行失败,则该任务将为失败状态。
当有取消或暂停工作流的请求时,任务将分别处于取消或暂停状态,直到执行过程中的所有Action执行完成。一旦这些Action执行完成时,任务将分别进入取消或暂停状态。如果指定了任务的 concurrency 并且还有剩余item,则不会请求新的 Action 执行。当暂停的工作流程恢复时,任务将继续处理剩余的 Item。
Item 可配置参数如下:
Attribute | Required | Description |
---|---|---|
items | Yes | The list of items to execute the action with. |
concurrency | No | The number of items being processed concurrently. |
一个简单的with-item 模型示例
以下是一个简单示例,其中包含任务中定义的单个 Item 列表。该任务会收到一个要回显的消息列表。对于不需要并发的项目 Item,有一个速记符号可以将列表直接传递给 with 语句。可以使用 item 函数将各个项目作为输入传递到 Action 中以供执行。
version: 1.0
input:
- messages
tasks:
task1:
with: <% ctx(messages) %>
action: core.echo message=<% item() %>
当需要并发执行时,使用 with-items 的 schema 去定义 ,如下所示
version: 1.0
input:
- messages
tasks:
task1:
with:
items: <% ctx(messages) %>
concurrency: 2
action: core.echo message=<% item() %>
进阶:为Item命名
Item 也可以被命名,下面示例时和上面相同功能的 workflow,但他为 items 使用 “message” 的作为名称进行标注。在标记时,指定了 message in <% ctx(messages) %>
进行命名,这里 item 被指定为 “message”,在引用时,item 函数也必须指定名称 item(message)
,这种场景返回的就不是列表,而是一个字典,类似 {"message": "value"}
在处理多个项目时比较有用。
version: 1.0
input:
- messages
tasks:
task1:
with: message in <% ctx(messages) %>
action: core.echo message=<% item(message) %>
为多个Item命名
在执行 Action 时,可以将多个 Item 作为 input 传入到 Action中,这里就利用到 item 命名的方式与另外 stackstrom 提供的 zip 函数。zip 与 python zip 相同,将多个元组进行迭代,然后将他们(多个元组相同下标项)的每项压缩为一个元组。
个元组进行迭代,然后将他们的每项压缩为一个元组。
version: 1.0
input:
- hosts
- commands
tasks:
task1:
with: host, command in <% zip(ctx(hosts), ctx(commands)) %>
action: core.remote hosts=<% item(host) %> cmd=<% item(command) %>
上面示例为,input 有两个列表,hosts 和 commands,这里使用 zip 压缩后将便成为 ,({hosts:xxx}, {commands:xxx})
这样在执行时通过 item 函数指定名称 item(host)
,就可以获取到对应的迭代器位的 host,通过这样的方式来进行多列表参数的传入。
下面是一个多参数 with-item 模型的执行示例
version: 1.0
description: A workflow demonstrating with items.
input:
- members
- test
tasks:
task1:
with: host, command in <% zip(ctx(members), ctx(test)) %>
action: core.echo message="<% item() %>, resistance is futile!"
output:
- items: <% task(task1).result.items.select($.result.stdout) %>
通过执行结果可以看出 multiple-Item 的机制
st2 execution get 664dd003169d72f36729cb70
id: 664dd003169d72f36729cb70
action.ref: frist.orquesta-with-items
parameters: None
status: succeeded (1s elapsed)
start_timestamp: Wed, 22 May 2024 10:59:15 UTC
end_timestamp: Wed, 22 May 2024 10:59:16 UTC
log:
- status: requested
timestamp: '2024-05-22T10:59:15.698000Z'
- status: scheduled
timestamp: '2024-05-22T10:59:15.813000Z'
- status: running
timestamp: '2024-05-22T10:59:15.865000Z'
- status: succeeded
timestamp: '2024-05-22T10:59:16.918000Z'
result:
output:
items:
- '{''host'': ''Lakshmi'', ''command'': ''t1''}, resistance is futile!'
- '{''host'': ''Lindsay'', ''command'': ''t2''}, resistance is futile!'
- '{''host'': ''Tomaz'', ''command'': ''t3''}, resistance is futile!'
+--------------------------+------------------------+-------+-----------+------------------------------+
| id | status | task | action | start_timestamp |
+--------------------------+------------------------+-------+-----------+------------------------------+
| 664dd004ed940d64824e33fb | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e33fe | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
| 664dd004ed940d64824e3402 | succeeded (0s elapsed) | task1 | core.echo | Wed, 22 May 2024 10:59:16 UTC|
+--------------------------+------------------------+-------+-----------+------------------------------+
Reference
[2] How many do you need? - Argo CD Architectures Explained
本文发布于Cylon的收藏册,转载请著名原文链接~
链接:https://www.oomkill.com/2024/05/stackstorm-sensors/
版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。