我们希望听到您的声音!帮助我们洞察 Ansible 生态系统的现状。
参加 Ansible 项目调查 2024

创建自定义事件驱动 Ansible 源插件

创建自定义事件驱动 Ansible 源插件

我们被包围了!我们现代的系统和应用程序不断地生成事件。这些事件可能是由服务请求、应用程序事件、健康检查等生成的。凭借围绕我们所做的一切的事件流量中的大量信息,事件驱动 Ansible 允许对传入事件做出自动响应。

但我们不仅完全沉浸在事件数据中,而且还沉浸在事件源中。想一想您的组织,甚至您的家庭,考虑一下有多少设备或应用程序正在生成数据,如果您能够轻松地收集这些数据,这些数据就可以使用。

事件驱动 Ansible 中的事件源插件充当 Ansible 和事件生成应用程序和服务之间的桥梁。事件驱动 Ansible 已经有一批事件插件可以从各种来源获取事件。但是,如果您的源插件不在该列表中怎么办?或者,如果您是希望将事件驱动 Ansible 连接到您自己的解决方案的 Red Hat 合作伙伴,该怎么办?好消息是,为事件驱动 Ansible 开发事件源插件可能是一个相对轻松的任务。

什么是源插件?

事件驱动 Ansible 利用规则手册来规范对事件的响应。规则手册将源、条件和操作组合在一起。根据来自源的一个或多个事件条件执行操作。事件源插件允许规则手册从云服务、应用程序和代理等接收事件。如果没有事件源,则不会接收事件,也不会执行操作。

事件源是包含在 Ansible 内容集合中的 Python 脚本。在规则手册中,事件源按名称调用,并且包含在规则手册源配置中的参数被传递到事件源插件中。在事件源插件中,例程应编写为异步以防止阻塞,允许尽可能有效地在多个事件源中接收和处理事件。出于这个原因,您会注意到所有最初的源插件(如 Kafka 和 Webhook)都利用了异步 I/O 范式。

源插件指南

对新的事件源插件进行范围界定应该很简单。因此,插件没有很多要求。要开始插件开发,以下是一些有关源插件的指南

  1. 源插件**必须**包含一个特定的入口点。
  2. 每个源**必须**具有嵌套键,这些键与主函数期望的参数匹配。
  3. 源插件应该使用预期的目的、预期的参数和规则手册示例进行文档化。
  4. 事件源插件应该在集合中分发。
  5. Python 例程应编写为非阻塞或异步。
  6. 源插件应该包含一种方法来测试 Event-Driven Ansible 之外的插件。

为了演示其中一些指南,我将使用我创建的一个示例源插件。我的源插件称为 new_records,它监视 ServiceNow 中的一个表,以查找要创建的新记录(例如,新的事件、问题和变更请求)。如果您想自己测试此源插件,您将需要一个 ServiceNow 实例,您可以将其作为ServiceNow 开发人员计划的一部分进行配置。

在您开始测试我的示例插件之前,请注意,此插件来自一个水平不高的 Python 人员,它仅用作示例,绝不推荐或建议用于生产环境。ServiceNow 实例还具有针对 REST 资源的速率限制规则,您可能会因过于频繁地轮询而遇到这些规则。考虑到事件推送范例是事件驱动 Ansible 源插件的首选范式,此源插件的更好实现可能是创建一个 ServiceNow Web 服务,将事件详细信息推送到事件聚合器!在这种情况下,我们的集成应用程序(ServiceNow)将事件详细信息推送到 JetStream 或 Kafka 之类的东西(为此已经有一个事件源插件!)。

源插件必须包含一个特定的入口点。

源插件需要一个非常特定的入口点配置。此入口点表示 Python 脚本中将由 ansible-rulebook 调用的函数,ansible-rulebook 是事件驱动 Ansible 中负责执行规则手册的组件。让我们看看我的 ServiceNow 自定义源插件的开头

import asyncio
import time
import os
from typing import Any, Dict
import aiohttp

# Entrypoint from ansible-rulebook
async def main(queue: asyncio.Queue, args: Dict[str, Any]):

在插件开头的所有导入语句之后,您可以看到入口点是一个名为 main 的异步函数,它接受两个参数。第一个参数是一个 asyncio 队列,当此源在规则手册中使用时,将由 ansible-rulebook 使用。第二个参数创建一个参数字典,我的特定源插件需要使用这些参数与我的 ServiceNow 实例建立连接。此字典将包含诸如用户名、密码和我的 ServiceNow 实例的 URL 等内容。就入口点而言,这确实是所有期望的内容。

每个源必须具有嵌套键,这些键与主函数期望的参数匹配。

这是一种稍微复杂的说法,即我在自定义 ServiceNow 事件插件中需要的参数也应该是用于配置源插件的规则手册中的键。为了演示这一点,请查看规则手册中我的自定义插件的源配置,然后查看 ansible-rulebook 执行的 main 函数期望的参数

规则手册示例

- name: Watch for new records
  hosts: localhost
  sources:
    - cloin.servicenow.new_records:
            instance: https://dev-012345.service-now.com
            username: ansible
            password: ansible
            table: incident
            interval: 1

插件代码

# Entrypoint from ansible-rulebook
async def main(queue: asyncio.Queue, args: Dict[str, Any]):

    instance = args.get("instance")
    username = args.get("username")
    password = args.get("password")
    table   = args.get("table")
    query   = args.get("query", "sys_created_onONToday@javascript:gs.beginningOfToday()@javascript:gs.endOfToday()")
    interval = int(args.get("interval", 5))

需要注意的是,如果您担心使用凭据或其他敏感参数来分发规则手册,ansible-rulebook 也接受在 vars 文件中设置的变量,或分别使用--vars--env-vars从环境变量中设置的变量。这意味着您的规则手册源配置可能看起来更像

- name: Watch for new records
  hosts: localhost
  sources:
    - cloin.servicenow.new_records:
        instance: {{ SN_HOST }}
        username: {{ SN_USERNAME }}
        password: {{ SN_PASSWORD }}
        table: incident
        interval: 1

源插件应该使用目的、预期的参数和规则手册示例进行文档化。

这有点像一个不言而喻的道理,即使我这个水平不高的 Python 开发人员也能理解。事实上,这实际上是我 2023 年的新年愿望之一。请查看我的源插件顶部的示例

"""
new_records.py

Description:
event-driven-ansible source plugin example
Poll ServiceNow API for new records in a table
Only retrieves records created after the script began executing
This script can be tested outside of ansible-rulebook by specifying
environment variables for SN_HOST, SN_USERNAME, SN_PASSWORD, SN_TABLE

Arguments:
  - instance: ServiceNow instance (e.g. https://dev-012345.service-now.com)
  - username: ServiceNow username
  - password: ServiceNow password
  - table:  Table to watch for new records
  - query:  (optional) Records to query. Defaults to records created today
  - interval: (optional) How often to poll for new records. Defaults to 5 seconds

Usage in a rulebook:
- name: Watch for new records
  hosts: localhost
  sources:
    - cloin.servicenow.new_records:
            instance: https://dev-012345.service-now.com
            username: ansible
            password: ansible
            table: incident
            interval: 1
  rules:
    - name: New record created
      condition: event.sys_id is defined
      action:
            debug:
"""

这是一个相当合理的指南,对吧?文档非常清楚地说明了这是一个事件驱动 Ansible 插件,插件预期能做什么,插件接受的参数以及如何在规则手册中使用此插件。

事件源插件应该在集合中分发。

Ansible 内容集合代表 Ansible 内容可以轻松分发的模型。通常,这些集合包含诸如插件、角色、剧本和文档之类的东西,并展示了 Ansible 的可扩展性。事件源插件和规则手册成为可以通过 Ansible 内容集合分发的额外内容类型。这在我的插件文档中展示如下

- name: Watch for new records
  hosts: localhost
  sources:
    - cloin.servicenow.new_records:
            instance: https://dev-012345.service-now.com

Python 例程应编写为非阻塞或异步。

异步模型表示,例如,new_records 源插件对 ServiceNow API 的请求不应阻塞或减慢对另一个源插件的另一个 API 的请求。通过在插件中使用 asyncio 以及 async 和 await,我们只需暂停该例程并等待结果,而不是阻止其他例程执行。如果您将两个仅使用同步例程编写的源插件组合到同一个规则手册中,您可能会发现您的规则手册执行缓慢,或者在事件发生后很久才对事件做出反应。以下是我源插件中的示例

            async with session.get(f'{instance}/api/now/table/{table}?sysparm_query={query}', auth=auth) as resp:
                if resp.status == 200:

                    records = await resp.json()
                    for record in records['result']:
…
                      await queue.put(record)

注意关键字 **async** 和 **await**。**async** 关键字让 Python 知道此协程将在事件循环中异步执行,同时等待 **await** 关键字指定的结果,在本例中为 ServiceNow API 调用的响应。

值得一提的另一行是上面代码段中 **queue.put(record)** 的最后一行 **await**。这至关重要,因为这是记录可以被规则手册引擎使用的方式。通过将 ServiceNow API 返回的记录放入队列,我们能够根据 API 请求返回的记录执行在规则手册中定义的操作。

源插件应该包含一种方法来测试 Event-Driven Ansible 之外的插件。

这其实并不是创建源插件的硬性规定。我认为它在插件开发过程中更有帮助,可能更像是一种最佳实践或一般提示,而不是其他任何东西。通过包含一个函数,该函数仅在通过运行直接调用脚本时运行(例如:python new_records.py),您可以在设置规则手册和启动 ansible-rulebook 之前快速测试对脚本的更改。对于我的示例插件,我使用以下内容

# this is only called when testing plugin directly, without ansible-rulebook
if __name__ == "__main__":
    instance = os.environ.get('SN_HOST')
    username = os.environ.get('SN_USERNAME')
    password = os.environ.get('SN_PASSWORD')
    table   = os.environ.get('SN_TABLE')

    class MockQueue:
        async def put(self, event):
            print(event)

    asyncio.run(main(MockQueue(), {"instance": instance, "username": username, "password": password, "table": table}))

如果您查看该代码示例,您会看到一个注释,说明这实际上只是为了直接测试 Python 脚本。如果您想自己测试此代码,您可以定义四个环境变量(例如 export SN_TABLE=incident...),然后执行脚本。从那里,打开您的 ServiceNow 实例并在您正在监视的表中创建一个新记录(在 SN_TABLE=incident 的情况下,您需要创建一个新的事件),然后您会看到脚本打印出新创建的记录。