什么是自动加载程序文件通知模式?

在文件通知模式下,自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 使用大型输入目录或大量文件时,文件通知模式相对于目录列表模式而言具有更高的性能和可伸缩性,但需要额外的云权限。

可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。

注意

Azure 高级存储帐户不支持文件通知模式,因为高级帐户不支持队列存储。

警告

文件通知模式不支持更改自动加载程序的源路径。 如果使用文件通知模式并更改了路径,则可能无法引入目录更新时新目录中已存在的文件。

自动加载程序文件通知模式中使用的云资源

重要

需要提升的权限才能自动为文件通知模式配置云基础结构。 请联系云管理员或工作区管理员。详见:

如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供附加选项,以进行创建这些资源所需的自动加载程序授权。

下表汇总了自动加载程序创建的资源。

云存储 订阅服务 队列服务 前缀 * 限制 **
Amazon S3 AWS SNS AWS SQS databricks-auto-ingest 每个 S3 存储桶 100 个
ADLS Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每存储帐户 500
Azure Blob 存储 Azure 事件网格 Azure 队列存储 Databricks(数据砖) 每存储帐户 500
  • 自动加载程序使用此前缀命名资源。

** 可以启动多少个并发文件通知管道

如果需要为给定存储帐户运行超过有限数量的文件通知管道,则可以:

  • 利用 AWS Lambda 或 Azure Functions 之类的服务,将来自单个队列(此队列侦听整个容器或 Bucket)的通知扇出,放入特定于目录的队列

文件通知事件

当文件上传到 S3 存储桶时,Amazon S3 都会提供一个 ObjectCreated 事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。

ADLS 为 Gen2 容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 自动加载程序流支持用于发现文件的 RenameFile 操作。 RenameFile 操作需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory 操作,以便发现文件。 RenameDirectory 操作需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

配置 ADLS 和 Azure Blob 存储的文件通知所需的权限

你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储

若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。

可以使用以下方法之一进行身份验证:

获取身份验证凭据后,向 Databricks 访问连接器(对于服务凭据)或Microsoft Entra ID 应用(对于服务主体)分配必要的权限。

  • 使用 Azure 内置角色

    为访问连接器分配输入路径所在的存储帐户的以下角色:

    • 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
    • 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,才需要此角色。

    请为相关资源组中的此访问连接器分配以下角色:

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 使用自定义角色

    如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    然后,可以将此自定义角色分配给访问连接器。

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

自动加载器权限设置

为 Amazon S3 配置文件通知所需的权限

你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息

若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。 需要此标识和访问管理角色,才能为自动加载程序创建用于身份验证的服务凭据。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

其中:

  • <bucket-name>:流将在其中读取文件的 S3 存储桶名称,例如,auto-logs。 可以使用 * 作为通配符,例如 databricks-*-logs。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行 %fs mounts 列出笔记本中的所有 DBFS 装入点。
  • <region>:S3 存储桶所在的 AWS 区域,例如 cn-north-2。 如果你不想要指定区域,请使用 *
  • <account-number>:拥有 S3 存储桶的 AWS 帐号,例如 123456789012。 如果你不想要指定帐号,请使用 *

SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*cloudFiles 源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。

注意

上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果你需要添加 S3 只读权限,请在 JSON 文档的 Action 语句的 DatabricksAutoLoaderSetup 列表中添加以下内容:

  • s3:ListBucket
  • s3:GetObject

初始设置后权限减少

上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。

重要

权限降低后,无法在出现故障时(例如,SQS 队列被意外删除)启动新的流式处理查询或重新创建资源;也无法使用云资源管理 API 来列出或拆卸资源。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

手动配置或管理文件通知资源

特权用户可以手动配置或管理文件通知资源。

  • 通过云提供商手动设置文件通知服务,并手动指定队列标识符。 有关更多详细信息,请参阅文件通知选项
  • 使用 Scala API 创建或管理通知和队列服务,如以下示例所示:

注意

必须要具有适当权限才能配置或修改云基础结构。 请参阅 AzureS3GCS 的权限文档。

Python语言

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala(编程语言)

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 创建名为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于自动加载程序文件通知模式中使用的云资源中总结的存储系统)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用cloudFiles中的标识符将该标识符传递给 源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。

只有调用 "path" 时才需要提供 newManagersetUpNotificationServices 选项;对于 listNotificationServicestearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path

以下矩阵指出了每种类型的存储在哪个 Databricks 运行时中支持哪些 API 方法:

云存储 安装程序 API 列出 API 拆解 API
Amazon S3 所有版本 所有版本 所有版本
ADLS 所有版本 所有版本 所有版本
Azure Blob 存储 所有版本 所有版本 所有版本

清理自动加载程序创建的事件通知资源

自动加载程序不会自动关闭文件通知资源。 若要拆解文件通知资源,必须使用云资源管理器,如上一部分所示。 还可以使用云提供商的 UI 或 API 手动删除这些资源。

解决常见问题

本节介绍将自动加载器与文件通知模式配合使用时出现的常见错误,以及如何解决它们。

无法创建事件网格订阅

如果第一次运行自动加载器时看到以下错误消息,则表示事件网格未在 Azure 订阅中注册为资源提供程序。

java.lang.RuntimeException: Failed to create event grid subscription.

若要将事件网格注册为资源提供程序,请执行下列操作:

  1. 在 Azure 门户中前往你的订阅。

  2. 单击“设置”部分的“资源提供程序”。

  3. 注册提供程序 Microsoft.EventGrid

执行事件网格订阅操作所需的授权

如果在首次运行自动加载器时看到以下错误消息,请确认为事件网格和存储帐户的服务主体分配了参与者角色。

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

事件网格客户端绕过代理

在 Databricks Runtime 15.2 及更高版本中,自动加载器中的事件网格连接默认使用系统属性中的代理设置。 在 Databricks Runtime 13.3 LTS、14.3 LTS 和 15.0 到 15.2 中,可以通过设置 Spark 配置属性 spark.databricks.cloudFiles.eventGridClient.useSystemProperties true 来手动配置事件网格连接以使用代理。 请参阅在 Azure Databricks 上设置 Spark 配置属性