Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
重要
对事件钩子的支持处于 公共预览版。
可以使用 事件挂钩 添加自定义 Python 回调函数,这些函数在事件持久保存到管道的 事件日志时运行。 可以使用事件挂钩来实现自定义监视和警报解决方案。 例如,当发生特定事件或与第三方解决方案集成以监视管道事件时,可以使用事件挂钩发送电子邮件或写入日志。
使用接受单个参数的 Python 函数定义事件挂钩,其中该参数是表示事件的字典。 然后,将事件挂钩包含到管道的源代码中。 管道中定义的任何事件挂钩都将尝试处理每个管道更新期间生成的所有事件。 如果管道由多个源代码项目组成,例如多个笔记本,则所有定义的事件挂钩都应用于整个管道。 尽管事件挂钩包含在管道的源代码中,但它们不包括在管道图中。
可以将事件挂钩与发布到 Hive 元存储或 Unity 目录的管道配合使用。
注释
- Python 是唯一支持用于定义事件挂钩的语言。 若要定义处理使用 SQL 接口实现的管道中的事件的自定义 Python 函数,请在作为管道一部分运行的单独 Python 笔记本中添加自定义函数。 当管道运行时,Python 函数将应用于整个管道。
- 事件挂钩仅在maturity_level为
STABLE
的情况下触发。 - 从管道更新异步执行事件挂钩,但与其他事件挂钩同步执行。 这意味着一次只运行一个事件挂钩,其他事件挂钩等待运行,直到当前正在运行的事件挂钩完成。 如果事件挂钩无限期运行,它将阻止所有其他事件挂钩。
- Lakeflow 声明性管道尝试在管道更新期间触发的每个事件上运行每个事件钩子。 为了帮助确保滞后的事件挂钩能够有时间处理所有排队的事件,Lakeflow 声明性管道会在终止运行管道的计算任务之前等待一个固定的不可配置时段。 但是,不能保证在终止计算之前在所有事件上触发所有挂钩。
监视事件挂钩处理
在 Lakeflow 声明性管道的事件日志中使用 hook_progress
事件类型来监视更新的事件钩子的状态。 为了防止循环依赖项,不会为 hook_progress
事件触发事件钩子。
定义事件钩子
若要定义事件挂钩,请使用 on_event_hook
修饰器:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures
描述事件挂钩被禁用之前可能失败的最大连续次数。 事件挂钩失败定义为每当事件挂钩引发异常时。 如果禁用事件挂钩,则在重启管道之前,它不会处理新事件。
max_allowable_consecutive_failures
必须是大于或等于 0
或等于 None
的整数。 值 None
(默认分配)表示事件挂钩允许的连续失败数没有限制,并且永远不会禁用事件挂钩。
事件挂钩失败和禁用事件挂钩可以在事件日志中作为 hook_progress
事件进行监视。
事件挂钩函数必须是一个 Python 函数,它只接受一个参数,即触发此事件挂钩的事件的字典表示形式。 将忽略事件挂钩函数中的任何返回值。
示例:选择要处理的特定事件
以下示例演示了一个事件挂钩,该挂钩选择要处理的特定事件。 具体而言,此示例会等待管道 STOPPING
事件收到,然后将消息输出到驱动程序日志 stdout
。
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
示例:将事件挂钩配置为在连续四次失败后禁用
以下示例演示如何配置一个事件挂钩,当连续失败四次时被禁用。
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
示例:带有事件挂钩的 Lakeflow 声明式管道
以下示例演示如何向管道的源代码添加事件挂钩。 这是在管道中使用事件挂钩的简单但完整的示例。
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})