本页介绍如何将自动加载程序流配置为使用文件通知模式以增量方式发现和引入云数据。
在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 与目录列表模式相比,文件通知模式的性能更高且可缩放。
可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。
注意
Azure 高级存储帐户不支持文件通知模式,因为高级帐户不支持队列存储。
警告
文件通知模式不支持更改自动加载程序的源路径。 如果使用文件通知模式并更改了路径,则可能无法引入目录更新时新目录中已存在的文件。
在外部位置启用或不启用文件事件时的文件通知模式
可通过两种方法将自动加载程序配置为使用文件通知模式:
- 旧文件通知模式:单独管理每个 Auto Loader 数据流的文件通知队列。 自动加载程序自动设置通知服务和队列服务,该服务订阅输入目录中的文件事件。 - 这是旧方法。 
- (推荐)文件事件(公开预览):在处理 Unity 目录中定义的任何外部位置文件的所有流时,使用一个 Azure Databricks 托管的文件通知队列。 - 此方法要求为外部文件路径启用事件功能。 它比旧方法具有以下优势: - Azure Databricks 可以为你设置云存储帐户中的订阅和文件事件,而无需使用服务凭据或其他特定于云的身份验证选项向自动加载程序提供其他凭据。 请参阅(推荐)为外部位置启用文件事件。
- 您需要在云存储帐户中创建的 Azure 托管标识策略数量减少。
- 由于不再需要为每个自动加载程序流创建队列,因此更容易避免达到 旧版自动加载程序文件通知模式中使用的云资源中列出的云提供程序通知限制。
- Azure Databricks 会自动管理资源要求的优化,因此无需优化参数,例如 cloudFiles.fetchParallelism。
- 清理功能意味着您无需过于担心在云中创建的通知的生命周期,特别是在删除流或完全刷新的情况下。
 
Databricks 建议,如果在当前目录列表模式下使用自动加载程序,则迁移到文件通知模式并显示文件事件,以查看显著的性能改进。
将文件通知模式与文件事件配合使用
本部分介绍如何创建和更新自动加载程序流以使用文件事件。
重要
自动加载器对文件事件的支持目前处于公共预览版。
在您开始之前
设置文件事件需要:
- 为 Unity Catalog 启用的 Azure Databricks 工作区。
- 在 Unity 目录中创建存储凭据和外部位置对象的权限。
具有文件事件的自动加载程序流需要:
- Databricks Runtime 14.3 LTS 或更高版本上的计算。
创建或更新使用文件事件的自动加载流之前:
- 检查您是否已有从外部位置获取数据的现有基于通知的自动加载程序流。 如果这样做,请将其关闭并删除关联的通知资源。
配置说明
以下说明适用于是创建新的自动加载程序流还是迁移现有流,以将升级的文件通知模式与文件事件一起使用:
- 为外部位置启用文件事件。 请参阅(推荐)为外部位置启用文件事件。 
- 创建新的自动加载程序流或编辑现有的自动加载程序流以使用外部位置时: - 如果现有的 基于通知的自动加载程序流 使用来自外部位置的数据,请将其关闭并删除关联的通知资源。
- 请确保未设置pathRewrites(这不是一个常见选项)。
- 查看自动加载程序在使用文件事件管理文件通知时不考虑的设置列表。 避免在新的自动加载程序流中使用它们,并从您正在迁移到此模式的现有流中删除它们。
- 在自动加载程序代码中将选项cloudFiles.useManagedFileEvents设置为true。
 
例如:
autoLoaderStream = (spark.readStream
    .format("cloudFiles")
    ...
    .options("cloudFiles.useManagedFileEvents", True)
    ...)
如果您使用的是 Lakeflow 声明性管道,并且已有一个包含流式处理表的管道,请更新它以包含useManagedFileEvents 选项:
CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );
不支持的自动加载程序设置
当流使用文件事件时,不支持以下自动加载程序设置:
| 设置 | 改变 | 
|---|---|
| useIncremental | 不再需要在文件通知的效率与目录列表的简单性之间做出决定。 带有文件事件的自动加载程序只有一种模式。 | 
| useNotifications | 每个外部位置只有一个队列和存储事件订阅。 | 
| cloudFiles.fetchParallelism | 具有文件事件的自动加载程序不提供手动并行优化。 | 
| cloudFiles.backfillInterval | Azure Databricks 自动处理为文件事件启用的外部位置的回填。 | 
| cloudFiles.pathRewrites | 仅当将外部数据位置装载到已弃用的 DBFS 时,此选项才适用。 | 
| resourceTags | 应使用云控制台设置资源标记。 | 
文件事件的自动加载程序的限制
文件事件服务通过缓存最近创建的文件来优化文件发现。 如果自动加载程序不经常运行,则此缓存可能会过期,自动加载程序会回退到目录列表以发现文件和更新缓存。 为了避免这种情况,请至少每七天调用一次自动加载程序。
有关文件事件限制的常规列表,请参阅 文件事件限制。
单独管理每个自动加载程序流的文件通知队列(旧版)
重要
需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:
旧版自动加载程序文件通知模式中使用的云资源
如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。
下表列出了由自动加载程序为每个云提供商创建的资源。
| 云存储 | 订阅服务 | 队列服务 | 前缀 * | 限制 ** | 
|---|---|---|---|---|
| ADLS | Azure 事件网格 | Azure 队列存储 | Databricks(数据砖) | 每存储帐户 500 | 
| Azure Blob 存储 | Azure 事件网格 | Azure 队列存储 | Databricks(数据砖) | 每存储帐户 500 | 
* 自动加载程序用此前缀命名资源。
** 可以启动多少个并发文件通知管道
如果您必须运行比这些限制更多的基于文件通知的自动加载器流,则可以使用 文件事件 或 AWS Lambda、Azure Functions 等服务,将侦听整个容器或存储桶的单个队列的通知分散到多个特定目录的队列中。
旧版文件通知事件
Azure Data Lake Storage 为存储容器中显示的文件提供不同的事件通知。
- 自动加载程序会侦听 FlushWithClose事件,以便处理某个文件。
- 自动加载程序流支持用于发现文件的 RenameFile操作。RenameFile操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
- 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory操作,以便发现文件。RenameDirectory操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。
注意
云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。
配置 Azure Data Lake Storage 和 Azure Blob 存储的文件通知所需的权限
你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储。
若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。
可以使用以下方法之一进行身份验证:
- 在 Databricks Runtime 16.1 及更高版本中:Databricks 服务凭据(建议):使用托管标识和 Databricks 访问连接器创建 服务凭据 。
- 服务主体:以客户端 ID 和客户端密码的形式创建 Microsoft Entra ID(前 Azure Active Directory)应用和服务主体。
获取身份验证凭据后,请将必要的权限分配给 Databricks 访问连接器(用于服务凭据)或 Microsoft Entra ID 应用(对于服务主体)。
- 使用 Azure 内置角色 - 为访问连接器分配输入路径所在的存储帐户的以下角色: - 请为相关资源组中的此访问连接器分配以下角色: - EventGrid EventSubscription Contributor:此角色用于执行 Azure 事件网格(事件网格)订阅操作,例如创建或列出事件订阅。
 - 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。 
- 使用自定义角色 - 如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出: - "permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]- 然后,可以将此自定义角色分配给访问连接器。 - 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色。 
               
              
            
手动配置或管理文件通知资源
特权用户可以手动配置或管理文件通知资源。
- 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项。
- 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#######################################
## Creating a ResourceManager in Azure
#######################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()
# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala(编程语言)
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()
/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
使用 setUpNotificationServices(<resource-suffix>) 来创建名称为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于存储系统,该系统汇总在 旧版自动加载程序文件通知模式下使用的云资源中)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用cloudFiles中的标识符将该标识符传递给  源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。
只有调用 "path" 时才需要提供 newManager 的 setUpNotificationServices 选项;对于 listNotificationServices 或 tearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path。
以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:
| 云存储 | 安装程序 API | 列出 API | 拆解 API | 
|---|---|---|---|
| ADLS | 所有版本 | 所有版本 | 所有版本 | 
| Azure Blob 存储 | 所有版本 | 所有版本 | 所有版本 | 
清理自动加载程序创建的事件通知资源
自动加载程序不会自动关闭文件通知资源。 若要拆解文件通知资源,必须使用云资源管理器,如上一部分所示。 还可以使用云提供商的 UI 或 API 手动删除这些资源。
解决常见问题
本节介绍将自动加载器与文件通知模式配合使用时出现的常见错误,以及如何解决它们。
无法创建事件网格订阅
如果第一次运行自动加载器时看到以下错误消息,则表示事件网格未在 Azure 订阅中注册为资源提供程序。
java.lang.RuntimeException: Failed to create event grid subscription.
若要将事件网格注册为资源提供程序,请执行下列操作:
- 在 Azure 门户中前往你的订阅。 
- 单击“设置”部分的“资源提供程序”。 
- 注册提供程序 - Microsoft.EventGrid。
执行事件网格订阅操作所需的授权
如果在首次运行自动加载器时看到以下错误消息,请确认为事件网格和存储帐户的服务主体分配了参与者角色。
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
事件网格客户端绕过代理
在 Databricks Runtime 15.2 及更高版本中,自动加载器中的事件网格连接默认使用系统属性中的代理设置。 在 Databricks Runtime 13.3 LTS、14.3 LTS 和 15.0 到 15.2 中,可以通过设置 Spark 配置属性 spark.databricks.cloudFiles.eventGridClient.useSystemProperties true 来手动配置事件网格连接以使用代理。 请参阅在 Azure Databricks 上设置 Spark 配置属性。