在文件通知模式下配置自动加载程序流

本页介绍如何将自动加载程序流配置为使用文件通知模式以增量方式发现和引入云数据。

在文件通知模式下,Auto Loader 可以自动设置和管理通知服务和队列服务,以订阅输入目录中的文件事件。 可以通过文件通知扩展自动加载器,每小时处理数百万个文件。 与目录列表模式相比,文件通知模式更快且更具可缩放性。 此外,可以随时在文件通知和目录列表之间切换,并且仍保持完全一次性的数据处理保证。

有关所有自动加载程序配置设置(包括文件通知选项和特定于云的身份验证选项)的完整参考,请参阅 自动加载程序选项

注意

尽管文件事件的文件通知模式提高了成本和可伸缩性,但它不能保证发现或处理文件的顺序。 设计流程以处理文件的无序到达。 有关指南,请参阅 处理无序数据

注意

自动加载程序不支持Azure高级存储帐户的文件通知模式,因为高级帐户不支持队列存储。

在外部位置启用或不启用文件事件时的文件通知模式

可通过两种方法将自动加载程序配置为使用文件通知模式:

  • (建议) 文件事件:对处理来自给定 外部位置的文件的所有流使用单个文件通知队列。 此方法在经典文件通知模式下具有以下优势:
    • Azure Databricks可以在您的云存储账户中设置订阅和文件事件,而无需使用服务凭据或其他云特定身份验证选项向Auto Loader提供额外凭据。 请参阅 为外部位置设置文件事件
    • 在您的云存储帐户中,需要创建的 Azure 托管身份策略更少。
    • 由于不再需要为每个自动加载程序流创建队列,因此更容易避免达到 经典自动加载程序文件通知模式中使用的云资源中列出的云提供程序通知限制。
    • Azure Databricks自动管理资源要求的优化,因此无需优化参数,例如 cloudFiles.fetchParallelism
    • 清理功能意味着您无需过于担心在云中创建的通知的生命周期,特别是在删除流或完全刷新的情况下。
  • 经典文件通知模式:单独管理每个自动加载程序流的文件通知队列。 自动加载程序自动设置通知服务和队列服务,该服务订阅输入目录中的文件事件。 这是经典方法。

如果在目录列表模式下使用自动加载程序,Databricks 建议使用文件事件迁移到文件通知模式。 具有文件事件的自动加载程序提供显著的性能改进。 首先为外部位置 启用文件事件 ,然后在自动加载程序流配置中设置 cloudFiles.useManagedFileEvents

将文件通知模式与文件事件配合使用

本部分介绍如何创建和更新自动加载程序流以使用文件事件。 使用文件通知模式时,Databricks 强烈建议执行以下操作:

  • 使用 Unity 目录卷:为每个自动加载程序从中加载数据的路径或子目录创建单独的 外部卷 。 向自动加载程序提供卷路径(例如 /Volumes/catalog/schema/volume),而不是云存储 URL(例如 s3://bucket/path)。 这提高了文件发现性能,因为文件事件服务可以将文件发现范围限定为仅相关对象,而不是循环访问外部位置中的所有对象。
  • 为每个子路径使用单独的卷:如果多个自动加载流从同一外部位置下的不同子路径进行读取,请为每个子路径创建专用卷,而不是共享单个卷。 这避免了不必要的文件发现开销,并有助于防止速率限制问题。

在您开始之前

设置文件事件需要:

  • 启用于 Unity Catalog 的 Azure Databricks 工作区。
  • 在 Unity 目录中创建存储凭据和外部位置对象的权限。

具有文件事件的自动加载程序流需要:

  • Databricks Runtime 14.3 LTS 或更高版本上的计算。

配置说明

以下说明适用于是创建新的自动加载程序流还是迁移现有流,以将升级的文件通知模式与文件事件一起使用:

  1. 在 Unity 目录中创建 存储凭据外部位置 ,以授予对自动加载程序流云存储中源位置的访问权限。

  2. 为外部位置启用文件事件。 请参阅 为外部位置设置文件事件

  3. 创建新的自动加载程序流或编辑现有的自动加载程序流以使用外部位置时:

    • 如果现有的 基于通知的自动加载程序流 使用来自外部位置的数据,请将其关闭并删除关联的通知资源。
    • 请确保未设置pathRewrites(这不是一个常见选项)。
    • 查看自动加载程序在使用文件事件管理文件通知时不考虑的设置列表。 避免在新的自动加载程序流中使用它们,并从您正在迁移到此模式的现有流中删除它们。
    • 在自动加载程序代码中将选项cloudFiles.useManagedFileEvents设置为true

例如:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

如果您正在使用 Lakeflow Spark 声明式管道,并且已有一个包含流处理表的管道,请将其更新以包含 useManagedFileEvents 选项。

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

不支持的自动加载程序设置

当流使用文件事件时,不支持以下自动加载程序设置:

设置 改变
useIncremental 不再需要在文件通知的效率与目录列表的简单性之间做出决定。 带有文件事件的自动加载程序只有一种模式。
useNotifications 每个外部位置只有一个队列和存储事件订阅。
cloudFiles.fetchParallelism 具有文件事件的自动加载程序不提供手动并行优化。
cloudFiles.backfillInterval Azure Databricks自动处理为文件事件启用的外部位置的回填。
cloudFiles.pathRewrites 仅当将外部数据位置装载到已弃用的 DBFS 时,此选项才适用。
resourceTags 应使用云控制台设置资源标记。

文件事件的自动加载程序的限制

文件事件服务通过缓存最近创建的文件来优化文件发现。 如果自动加载程序不经常运行,则此缓存可能会过期,自动加载程序会回退到目录列表以发现文件和更新缓存。 为了避免这种情况,请至少每七天调用一次自动加载程序。

有关文件事件限制的常规列表,请参阅 文件事件限制

单独管理每个自动加载程序流的文件通知队列(经典)

在经典文件通知模式下,自动加载程序会自动为每个流设置专用通知服务和队列。 此方法要求你管理每个流的通知队列,并提供用于创建云资源的身份验证凭据。 Databricks 建议使用新工作负载的文件通知模式。

重要

需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:

在经典文件通知模式下,自动加载程序会自动为订阅输入目录中的文件事件的每个流设置通知服务和队列服务。 您分别管理每个自动加载器流的通知队列。

警告

自动加载程序不支持在经典文件通知模式下 更改源路径 。 如果更改路径,则可能无法在路径更新时引入新位置中已存在的文件。

经典自动加载程序文件通知模式中使用的云资源

如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。

下表列出了自动加载程序为每个云提供商创建的资源。

云存储 订阅服务 队列服务 前缀 * 限制 **
ADLS Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每个存储账户 500
Azure Blob 存储 Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每个存储账户 500

* 自动加载程序用此前缀命名资源。

** 可以启动多少个并发文件通知管道

如果您必须运行比这些限制允许的更多基于文件通知的自动加载器流,可以使用文件事件或Azure Functions等服务,将来自侦听整个容器或存储桶的单个队列的通知分发到特定目录的队列中。

经典文件通知事件

Azure Data Lake Storage为存储容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 自动加载程序数据流支持用于发现文件的 RenameFile 操作。 RenameFile 操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的 Auto Loader 数据流支持 RenameDirectory 操作来发现文件。 RenameDirectory 操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

配置Azure Data Lake Storage和Azure Blob Storage的文件通知所需的权限

你必须具有对输入目录的读取权限。 请参阅 Azure Blob Storage

若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。 可以使用以下方法之一进行身份验证:

获取身份验证凭据后,使用以下方法之一分配必要的权限:

  • Azure内置角色:

    • 为输入路径所在的存储帐户分配以下角色的访问连接器:
      • 参与者:此角色用于在存储帐户中设置资源,例如队列和事件订阅。
      • 存储队列数据参与者:此角色用于执行队列操作,例如从队列中检索和删除消息。 仅当提供没有连接字符串的服务主体时,才需要此角色。
    • 将以下角色分配给访问连接器的相关资源组:
  • 自定义角色:如果担心授予上述角色所需的权限,可以改为使用以下权限创建自定义 角色 。 创建角色后,将其分配给访问连接器。 有关详细信息,请参阅使用 Azure 门户分配角色

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    自动加载器权限设置

手动配置或管理文件通知资源

特权用户可以手动配置或管理文件通知资源。 若要通过云提供商设置文件通知服务并指定队列标识符,请参阅 文件通知选项。 若要使用 Scala API 创建或管理通知和队列服务,请改为:

注意

必须要具有适当权限才能配置或修改云基础结构。 请参阅 AzureS3GCS

步骤 1:在 Azure 中创建 ResourceManager

Python

# Create a ResourceManager in Azure

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

Scala(编程语言)

// Create a ResourceManager in Azure

import com.databricks.sql.CloudFilesAzureResourceManager

// Using a Databricks service credential
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

// Using an Azure service principal
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

步骤 2:使用资源管理器设置、查看和关闭文件通知服务

Python

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>.
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala(编程语言)

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 来创建一个名称为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于 经典自动加载程序文件通知模式中使用的云资源所汇总的存储系统)。 如果存在同名的现有资源,Azure Databricks重复使用现有资源,而不是创建新的资源。 此函数返回一个队列标识符,您可以将该标识符通过cloudFiles传递给 文件通知选项 中的源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。

仅当调用"path"时,提供newManager选项给setUpNotificationServices。 它不需要 listNotificationServicestearDownNotificationServices。 这是你在执行流式处理查询时使用的同一 path

以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:

云存储 设置 API 列出 API 拆解 API
ADLS 所有版本 所有版本 所有版本
Azure Blob 存储 所有版本 所有版本 所有版本

清理自动加载程序创建的事件通知资源

自动加载程序不会自动关闭文件通知资源。 若要拆解文件通知资源,必须使用云资源管理器,如上一部分所示。 还可以使用云提供商的 UI 或 API 手动删除这些资源。

解决常见问题

本节介绍将自动加载器与文件通知模式配合使用时出现的常见错误,以及如何解决它们。

无法创建事件网格订阅

如果在首次运行自动加载程序时看到以下错误消息,则尚未在Azure订阅中将事件网格注册为资源提供程序。

java.lang.RuntimeException: Failed to create event grid subscription.

若要将事件网格注册为资源提供程序,请执行下列操作:

  1. 在Azure门户中,转到订阅。

  2. 在“设置”部分选择 “资源提供程序 ”。

  3. 注册供应商Microsoft.EventGrid

执行事件网格订阅操作所需的授权

如果在首次运行自动加载器时看到以下错误消息,请确认为事件网格和存储帐户的服务主体分配了参与者角色。

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

事件网格客户端绕过代理

在 Databricks Runtime 15.2 及更高版本中,自动加载器中的事件网格连接默认使用系统属性中的代理设置。 在 Databricks Runtime 13.3 LTS、14.3 LTS 和 15.0 到 15.2 中,可以通过设置 Spark 配置属性 spark.databricks.cloudFiles.eventGridClient.useSystemProperties true 来手动配置事件网格连接以使用代理。 请参阅 在 Azure Databricks 上设置 Spark 配置属性

请求过多

如果在自动加载程序流日志中看到以下错误消息,则表明流超出了 Databricks 文件事件服务的速率限制:

com.databricks.sql.util.UnexpectedHttpStatus: Too many requests. Please wait a moment and try again.

当多个自动加载程序流从同一外部位置下的不同子路径读取,而不使用 Unity 目录卷时,通常会发生这种情况。 文件事件服务必须循环访问外部位置中的所有对象,以查找每个流的相关文件,从而导致 API 调用过多。 若要解决此问题,请按照文件通知模式与文件事件一起使用中描述的建议来进行操作。

其他资源