Azure Databricks 上的 MLflow 模型注册表 Webhook

重要

此功能目前以公共预览版提供。

Webhook 使你能够侦听模型注册表事件,以便你的集成可以自动触发操作。 可以使用 Webhook 自动执行机器学习管道并将其与现有 CI/CD 工具和工作流集成。 例如,可以在创建新的模型版本时触发 CI 生成,或者在每次请求将模型过渡到生产版本时通过 Slack 来通知团队成员。

Webhooks 可以通过 Databricks REST APIPyPI 上的 Python 客户端 databricks-registry-webhooks 获得。

注意

使用 Unity Catalog 中的模型时,Webhook 不可用。 作为替代选项,请参阅我是否可以对事件使用阶段转换请求或触发 Webhook?。 不支持将 Webhook 发送到专用终结点(无法从公共 Internet 访问的终结点)。

Webhook 事件

你可以指定要在以下一个或多个事件时触发的 Webhook:

  • MODEL_VERSION_CREATED:为关联的模型创建了新的模型版本。
  • MODEL_VERSION_TRANSITIONED_STAGE:模型版本的阶段已更改。
  • TRANSITION_REQUEST_CREATED:用户请求了过渡模型版本的阶段。
  • COMMENT_CREATED:用户对已注册的模型写了评论。
  • REGISTERED_MODEL_CREATED:创建了一个新的注册模型。 只能为注册表范围的 Webhook 指定此事件类型,可以通过不在创建请求中指定模型名称来创建此事件类型。
  • MODEL_VERSION_TAG_SET:用户在模型版本上设置标记。
  • MODEL_VERSION_TRANSITIONED_TO_STAGING:模型版本已转换为暂存。
  • MODEL_VERSION_TRANSITIONED_TO_PRODUCTION:模型版本已过渡到生产。
  • MODEL_VERSION_TRANSITIONED_TO_ARCHIVED:模型版本已存档。
  • TRANSITION_REQUEST_TO_STAGING_CREATED:用户请求将模型版本转换为暂存。
  • TRANSITION_REQUEST_TO_PRODUCTION_CREATED:用户请求将模型版本转换为生产版本。
  • TRANSITION_REQUEST_TO_ARCHIVED_CREATED:用户请求存档模型版本。

Webhook 的类型

根据触发器目标,有两种类型的 Webhook:

  • 具有 HTTP 终结点的 Webhook(HTTP 注册表 Webhook):将触发器发送到 HTTP 终结点。
  • 具有作业触发器的 Webhook(作业注册表 Webhook):在 Azure Databricks 工作区中触发作业。 如果在作业的工作区中启用了 IP 允许列表,则必须将模型注册表的工作区 IP 加入允许列表。 有关详细信息,请参阅作业注册表 Webhook 的 IP 允许列表

还有两种类型的 Webhook,基于它们的作用域,具有不同的访问控制要求:

  • 特定于模型的 Webhook:Webhook 适用于特定的注册模型。 必须对已注册模型拥有 CAN MANAGE 权限才能创建、修改、删除或测试特定于模型的 Webhook。
  • 注册表范围的 Webhook:Webhook 由工作区中任何已注册模型上的事件触发,包括创建新的注册模型。 若要创建注册表范围的 Webhook,请在创建时省略 model_name 字段。 你必须具有工作区管理员权限才能创建、修改、删除或测试注册表范围的 Webhook。

Webhook 有效负载

每个事件触发器在向 Webhook 发出请求的有效负载中包含最少字段终结点。

  • 将排除敏感信息,如项目路径位置。 具有相应 ACL 的用户和主体可以使用客户端或 REST API 在模型注册表中查询此信息。
  • 有效负载不会被加密。 有关如何验证 Azure 数据砖是否为 Webhook 源的信息,请参阅安全性
  • text 字段有助于 Slack 集成。 若要发送 Slack 消息,请提供 Slack Webhook 终结点作为 Webhook URL。

作业注册表 Webhook 有效负载

作业注册表 Webhook 的有效负载取决于作业类型,将发送到目标工作区中的 jobs/run-now 终结点。

单任务作业

单任务作业具有三个有效负载中的一个,具体取决于任务类型。

笔记本和 Python wheel 作业

笔记本和 Python wheel 作业具有一个 JSON 有效负载,该有效负载附带一个包含字段 event_message 的参数字典。

{
  "job_id": 1234567890,
  "notebook_params": {
    "event_message": "<Webhook Payload>"
  }
}
Python、JAR 和 Spark 提交作业

Python、JAR 和 Spark 提交作业具有一个附带参数列表的 JSON 有效负载。

{
  "job_id": 1234567890,
  "python_params": ["<Webhook Payload>"]
}
所有其他作业

所有其他类型的作业具有一个不带参数的 JSON 有效负载。

{
  "job_id": 1234567890
}

多任务作业

多任务作业具有一个 JSON 有效负载,其中填充了所有参数以考虑到不同的任务类型。

{
  "job_id": 1234567890,
  "notebook_params": {
    "event_message": "<Webhook Payload>"
  },
  "python_named_params": {
    "event_message": "<Webhook Payload>"
  },
  "jar_params": ["<Webhook Payload>"],
  "python_params": ["<Webhook Payload>"],
  "spark_submit_params": ["<Webhook Payload>"]
}

示例有效负载

事件:MODEL_VERSION_TRANSITIONED_STAGE

响应

POST
/your/endpoint/for/event/model-versions/stage-transition
--data {
  "event": "MODEL_VERSION_TRANSITIONED_STAGE",
  "webhook_id": "c5596721253c4b429368cf6f4341b88a",
  "event_timestamp": 1589859029343,
  "model_name": "Airline_Delay_SparkML",
  "version": "8",
  "to_stage": "Production",
  "from_stage": "None",
  "text": "Registered model 'someModel' version 8 transitioned from None to Production."
}

事件:MODEL_VERSION_TAG_SET

响应

POST
/your/endpoint/for/event/model-versions/tag-set
--data {
  "event": "MODEL_VERSION_TAG_SET",
  "webhook_id": "8d7fc634e624474f9bbfde960fdf354c",
  "event_timestamp": 1589859029343,
  "model_name": "Airline_Delay_SparkML",
  "version": "8",
  "tags": [{"key":"key1","value":"value1"},{"key":"key2","value":"value2"}],
  "text": "example@yourdomain.com set version tag(s) 'key1' => 'value1', 'key2' => 'value2' for registered model 'someModel' version 8."
}

事件:COMMENT_CREATED

响应

POST
/your/endpoint/for/event/comments/create
--data {
  "event": "COMMENT_CREATED",
  "webhook_id": "8d7fc634e624474f9bbfde960fdf354c",
  "event_timestamp": 1589859029343,
  "model_name": "Airline_Delay_SparkML",
  "version": "8",
  "comment": "Raw text content of the comment",
  "text": "A user commented on registered model 'someModel' version 8."
}

安全性

出于安全考虑,Azure Databricks 在从有效负载计算的标头中,以及在与使用 HMAC 和 SHA-256 算法的 Webhook 关联的共享机密密钥中,包含 X-Databricks-Signature。

此外,你可以在传出请求中包含一个标准授权标头,为此,在 Webhook 的 HttpUrlSpec 中指定一个标头即可。

客户端验证

如果设置了共享机密,则有效负载接收者应通过使用共享机密对有效负载进行 HMAC 编码,然后将编码值与标头中的 X-Databricks-Signature 进行比较来验证 HTTP 请求的来源。 如果禁用了 SSL 证书验证(即,如果 enable_ssl_verification 字段设置为 false),这一点尤其重要。

注意

enable_ssl_verification 默认设置为 true。 对于自签名证书,此字段必须为 false,并且目标服务器必须禁用证书验证。

出于安全考虑,Databricks 建议你对有效负载的 HMAC 编码部分执行机密验证。 如果禁用主机名验证,那么请求可能被恶意路由到意外主机的风险会增加。

import hmac
import hashlib
import json

secret = shared_secret.encode('utf-8')
signature_key = 'X-Databricks-Signature'

def validate_signature(request):
  if not request.headers.has_key(signature_key):
    raise Exception('No X-Signature. Webhook not be trusted.')

  x_sig = request.headers.get(signature_key)
  body = request.body.encode('utf-8')
  h = hmac.new(secret, body, hashlib.sha256)
  computed_sig = h.hexdigest()

  if not hmac.compare_digest(computed_sig, x_sig.encode()):
    raise Exception('X-Signature mismatch. Webhook not be trusted.')

HTTP 注册表 Webhook 的授权标头

如果设置了授权标头,则客户端应通过验证授权标头中的持有者令牌或授权凭据来验证 HTTP 请求的来源。

作业注册表 Webhook 的 IP 允许列表

若要使用在启用了 IP 允许列表的其他工作区中触发作业运行的 Webhook,必须将 Webhook 所在的区域 NAT IP 列入允许列表,以接受传入请求。

如果 Webhook 和作业位于同一工作区中,则无需将任何 IP 加入允许列表。

如果作业位于 Azure 多租户区域中,请参阅 Azure Databricks 控制平面地址。 对于所有其他区域,请联系客户团队以确定需要加入允许列表的 IP。

审核日志

如果为工作区启用了审核日志记录,则审核日志中将包含以下事件:

  • 创建 Webhook
  • 更新 Webhook
  • 列出 webhook
  • 删除 webhook
  • 测试 webhook
  • Webhook 触发器

Webhook 触发器审核日志记录

对于具有 HTTP 终结点的 Webhook,将记录发送到为 Webhook 指定的 URL 的 HTTP 请求以及 URL 和 enable_ssl_verification 值。

对于带有作业触发器的 Webhook,将记录 job_idworkspace_url 值。

示例

本节包括:

HTTP 注册表 Webhook 示例工作流

1. 创建 Webhook

当 HTTPS 终结点准备好接收 Webhook 事件请求时,你可以使用 Webhook Databricks REST API 创建 Webhook。 例如,Webhook 的 URL 可以指向 Slack,以将消息发布到通道。

$ curl -X POST -H "Authorization: Bearer <access-token>" -d \
'{"model_name": "<model-name>",
  "events": ["MODEL_VERSION_CREATED"],
  "description": "Slack notifications",
  "status": "TEST_MODE",
  "http_url_spec": {
    "url": "https://hooks.slack.com/services/...",
    "secret": "anyRandomString"
    "authorization": "Bearer AbcdEfg1294"}}' https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/create
from databricks_registry_webhooks import RegistryWebhooksClient, HttpUrlSpec

http_url_spec = HttpUrlSpec(
  url="https://hooks.slack.com/services/...",
  secret="secret_string",
  authorization="Bearer AbcdEfg1294"
)
http_webhook = RegistryWebhooksClient().create_webhook(
  model_name="<model-name>",
  events=["MODEL_VERSION_CREATED"],
  http_url_spec=http_url_spec,
  description="Slack notifications",
  status="TEST_MODE"
)

响应

{"webhook": {
   "id":"1234567890",
   "creation_timestamp":1571440826026,
   "last_updated_timestamp":1582768296651,
   "status":"TEST_MODE",
   "events":["MODEL_VERSION_CREATED"],
   "http_url_spec": {
     "url": "https://hooks.slack.com/services/...",
     "enable_ssl_verification": True
}}}

还可以使用 Databricks Terraform 提供程序databricks_mlflow_webhook 创建 HTTP 注册表 Webhook。

2. 测试 Webhook

以前的 Webhook 是在 TEST_MODE 中创建的,因此可以触发模拟事件以向指定的 URL 发送请求。 但是,Webhook 不会在真实事件上触发。 测试终结点返回从指定 URL 接收到的状态代码和正文。

$ curl -X POST -H "Authorization: Bearer <access-token>" -d \
'{"id": "1234567890"}' \
https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/test
from databricks_registry_webhooks import RegistryWebhooksClient

http_webhook = RegistryWebhooksClient().test_webhook(
  id="1234567890"
)

响应

{
 "status":200,
 "body":"OK"
}

3. 将 Webhook 更新为活动状态

要为真实事件启用 Webhook,请通过更新调用将其状态设置为 ACTIVE,这还可用于更改其任何其他属性。

$ curl -X PATCH -H "Authorization: Bearer <access-token>" -d \
'{"id": "1234567890", "status": "ACTIVE"}' \
https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/update
from databricks_registry_webhooks import RegistryWebhooksClient

http_webhook = RegistryWebhooksClient().update_webhook(
  id="1234567890",
  status="ACTIVE"
)

响应

{"webhook": {
   "id":"1234567890",
   "creation_timestamp":1571440826026,
   "last_updated_timestamp":1582768296651,
   "status": "ACTIVE",
   "events":["MODEL_VERSION_CREATED"],
   "http_url_spec": {
     "url": "https://hooks.slack.com/services/...",
     "enable_ssl_verification": True
}}}

4. 删除 Webhook

要禁用 Webhook,请将其状态设置为 DISABLED(使用与上面类似的更新命令),或者将其删除。

$ curl -X DELETE -H "Authorization: Bearer <access-token>" -d \
'{"id": "1234567890"}' \
https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/delete
from databricks_registry_webhooks import RegistryWebhooksClient

http_webhook = RegistryWebhooksClient().delete_webhook(
  id="1234567890"
)

响应

{}

作业注册表 Webhook 示例工作流

用于管理作业注册表 Webhook 的工作流类似于 HTTP 注册表 Webhook,唯一的区别是 job_spec 字段取代了 http_url_spec 字段。

使用 Webhook,你可以在同一工作区或不同工作区中触发作业。 工作区是使用可选参数 workspace_url 指定的。 如果 workspace_url 不存在,则默认行为是在 Webhook 所在的同一工作区中触发作业。

要求

注意

作为安全最佳做法,在使用自动化工具、系统、脚本和应用进行身份验证时,Databricks 建议使用属于服务主体(而不是工作区用户)的个人访问令牌。 若要为服务主体创建令牌,请参阅管理服务主体的令牌

创建作业注册表 Webhook

$ curl -X POST -H "Authorization: Bearer <access-token>" -d \ '{"model_name": "<model-name>",
  "events": ["TRANSITION_REQUEST_CREATED"],
  "description": "Job webhook trigger",
  "status": "TEST_MODE",
  "job_spec": {
    "job_id": "1",
    "workspace_url": "https://my-databricks-workspace.com",
    "access_token": "dapi12345..."}}'
https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/create
from databricks_registry_webhooks import RegistryWebhooksClient, JobSpec

job_spec = JobSpec(
  job_id="1",
  workspace_url="https://my-databricks-workspace.com",
  access_token="dapi12345..."
)
job_webhook = RegistryWebhooksClient().create_webhook(
  model_name="<model-name>",
  events=["TRANSITION_REQUEST_CREATED"],
  job_spec=job_spec,
  description="Job webhook trigger",
  status="TEST_MODE"
)

响应

{"webhook": {
   "id":"1234567891",
   "creation_timestamp":1591440826026,
   "last_updated_timestamp":1591440826026,
   "status":"TEST_MODE",
   "events":["TRANSITION_REQUEST_CREATED"],
   "job_spec": {
     "job_id": "1",
     "workspace_url": "https://my-databricks-workspace.com"
}}}

还可以使用 Databricks Terraform 提供程序databricks_mlflow_webhook 创建作业注册表 Webhook。

列出注册表 Webhook 示例

$ curl -X GET -H "Authorization: Bearer <access-token>" -d \ '{"model_name": "<model-name>"}'
https://<databricks-instance>/api/2.0/mlflow/registry-webhooks/list
from databricks_registry_webhooks import RegistryWebhooksClient

webhooks_list = RegistryWebhooksClient().list_webhooks(model_name="<model-name>")

响应

{"webhooks": [{
   "id":"1234567890",
   "creation_timestamp":1571440826026,
   "last_updated_timestamp":1582768296651,
   "status": "ACTIVE",
   "events":["MODEL_VERSION_CREATED"],
   "http_url_spec": {
     "url": "https://hooks.slack.com/services/...",
     "enable_ssl_verification": True
}},
{
   "id":"1234567891",
   "creation_timestamp":1591440826026,
   "last_updated_timestamp":1591440826026,
   "status":"TEST_MODE",
   "events":["TRANSITION_REQUEST_CREATED"],
   "job_spec": {
     "job_id": "1",
     "workspace_url": "https://my-databricks-workspace.com"
}}]}

笔记本

MLflow 模型注册表 Webhook REST API 示例笔记本

获取笔记本

MLflow 模型注册表 Webhook Python 客户端示例笔记本

获取笔记本